From 9db0c280510bb0d8ceaafe61f1e2f9ff39707ca5 Mon Sep 17 00:00:00 2001 From: Adam Szkoda Date: Tue, 16 Jun 2020 03:34:04 +0200 Subject: [PATCH] Make key value storage abstractions more accurate (#1267) * Layer do_atomically() abstractions properly * Reduce allocs and DRY get_key_for_col() * Parameterize HotColdDB with hot and cold item stores * -impl Store for MemoryStore * Replace Store uses with HotColdDB * Ditch Store trait * cargo fmt * Style fix * Readd missing dep that broke the build --- Cargo.lock | 3 +- beacon_node/beacon_chain/src/beacon_chain.rs | 21 ++- beacon_node/beacon_chain/src/builder.rs | 154 +++++++++++----- beacon_node/beacon_chain/src/eth1_chain.rs | 71 +++---- beacon_node/beacon_chain/src/migrate.rs | 40 ++-- beacon_node/beacon_chain/src/test_utils.rs | 41 +++-- .../tests/attestation_production.rs | 7 +- .../tests/attestation_verification.rs | 3 +- .../beacon_chain/tests/block_verification.rs | 7 +- .../beacon_chain/tests/persistence_tests.rs | 4 +- beacon_node/beacon_chain/tests/store_tests.rs | 13 +- beacon_node/beacon_chain/tests/tests.rs | 8 +- beacon_node/client/src/builder.rs | 173 +++++++++--------- .../src/attestation_service/tests/mod.rs | 15 +- beacon_node/network/src/persisted_dht.rs | 25 ++- beacon_node/network/src/router/processor.rs | 1 - beacon_node/network/src/service.rs | 7 +- beacon_node/network/src/service/tests.rs | 8 +- beacon_node/rest_api/src/beacon.rs | 1 - beacon_node/rest_api/src/helpers.rs | 2 +- beacon_node/src/lib.rs | 10 +- beacon_node/store/Cargo.toml | 1 + beacon_node/store/src/chunked_iter.rs | 18 +- beacon_node/store/src/forwards_iter.rs | 34 ++-- beacon_node/store/src/hot_cold_store.rs | 111 ++++++++--- beacon_node/store/src/iter.rs | 131 ++++++++----- beacon_node/store/src/leveldb_store.rs | 42 ++--- beacon_node/store/src/lib.rs | 83 ++------- beacon_node/store/src/memory_store.rs | 123 +------------ beacon_node/store/src/state_batch.rs | 7 +- 30 files changed, 589 insertions(+), 575 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8da7a8ae5..fab4bc448 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4312,7 +4312,7 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" name = "slashing_protection" version = "0.1.0" dependencies = [ - "parking_lot 0.9.0", + "parking_lot 0.10.2", "r2d2", "r2d2_sqlite", "rayon", @@ -4643,6 +4643,7 @@ dependencies = [ "serde", "serde_derive", "slog", + "sloggers", "state_processing", "tempfile", "tree_hash", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7e7f76c7a..8d3294493 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -40,7 +40,7 @@ use std::io::prelude::*; use std::sync::Arc; use std::time::{Duration, Instant}; use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator}; -use store::{Error as DBError, Store}; +use store::{Error as DBError, HotColdDB}; use types::*; // Text included in blocks. @@ -148,10 +148,11 @@ pub struct HeadInfo { } pub trait BeaconChainTypes: Send + Sync + 'static { - type Store: store::Store; - type StoreMigrator: Migrate; + type HotStore: store::ItemStore; + type ColdStore: store::ItemStore; + type StoreMigrator: Migrate; type SlotClock: slot_clock::SlotClock; - type Eth1Chain: Eth1ChainBackend; + type Eth1Chain: Eth1ChainBackend; type EthSpec: types::EthSpec; type EventHandler: EventHandler; } @@ -161,7 +162,7 @@ pub trait BeaconChainTypes: Send + Sync + 'static { pub struct BeaconChain { pub spec: ChainSpec, /// Persistent storage for blocks, states, etc. Typically an on-disk store, such as LevelDB. - pub store: Arc, + pub store: Arc>, /// Database migrator for running background maintenance on the store. pub store_migrator: T::StoreMigrator, /// Reports the current slot, typically based upon the system clock. @@ -185,7 +186,7 @@ pub struct BeaconChain { /// Maintains a record of which validators have proposed blocks for each slot. pub observed_block_producers: ObservedBlockProducers, /// Provides information from the Ethereum 1 (PoW) chain. - pub eth1_chain: Option>, + pub eth1_chain: Option>, /// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received. pub(crate) canonical_head: TimeoutRwLock>, /// The root of the genesis block. @@ -335,16 +336,18 @@ impl BeaconChain { pub fn forwards_iter_block_roots( &self, start_slot: Slot, - ) -> Result<>::ForwardsBlockRootsIterator, Error> { + ) -> Result>, Error> { let local_head = self.head()?; - Ok(T::Store::forwards_block_roots_iterator( + let iter = HotColdDB::forwards_block_roots_iterator( self.store.clone(), start_slot, local_head.beacon_state, local_head.beacon_block_root, &self.spec, - )?) + )?; + + Ok(iter.map(|result| result.map_err(Into::into))) } /// Traverse backwards from `block_root` to find the block roots of its ancestors. diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index ff8d5be71..5b196bf79 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -24,7 +24,7 @@ use std::marker::PhantomData; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; -use store::Store; +use store::{HotColdDB, ItemStore}; use types::{ BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Signature, SignedBeaconBlock, Slot, }; @@ -33,28 +33,48 @@ pub const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz"; /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing /// functionality and only exists to satisfy the type system. -pub struct Witness( +pub struct Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, +>( PhantomData<( - TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, TEventHandler, + THotStore, + TColdStore, )>, ); -impl BeaconChainTypes - for Witness +impl + BeaconChainTypes + for Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + > where - TStore: Store + 'static, - TStoreMigrator: Migrate + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, + TStoreMigrator: Migrate + 'static, TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, { - type Store = TStore; + type HotStore = THotStore; + type ColdStore = TColdStore; type StoreMigrator = TStoreMigrator; type SlotClock = TSlotClock; type Eth1Chain = TEth1Backend; @@ -71,7 +91,7 @@ where /// /// See the tests for an example of a complete working example. pub struct BeaconChainBuilder { - store: Option>, + store: Option>>, store_migrator: Option, canonical_head: Option>, /// The finalized checkpoint to anchor the chain. May be genesis or a higher @@ -80,7 +100,7 @@ pub struct BeaconChainBuilder { genesis_block_root: Option, op_pool: Option>, fork_choice: Option>, - eth1_chain: Option>, + eth1_chain: Option>, event_handler: Option, slot_clock: Option, head_tracker: Option, @@ -92,15 +112,24 @@ pub struct BeaconChainBuilder { log: Option, } -impl +impl BeaconChainBuilder< - Witness, + Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, + TStoreMigrator: Migrate + 'static, TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, { @@ -142,7 +171,7 @@ where /// Sets the store (database). /// /// Should generally be called early in the build chain. - pub fn store(mut self, store: Arc) -> Self { + pub fn store(mut self, store: Arc>) -> Self { self.store = Some(store); self } @@ -379,7 +408,15 @@ where self, ) -> Result< BeaconChain< - Witness, + Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, >, String, > { @@ -480,17 +517,26 @@ where } } -impl +impl BeaconChainBuilder< - Witness, + Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate + 'static, + TStoreMigrator: Migrate + 'static, TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Initializes a fork choice with the `ThreadSafeReducedTree` backend. /// @@ -543,20 +589,22 @@ where } } -impl +impl BeaconChainBuilder< Witness< - TStore, TStoreMigrator, TSlotClock, - CachingEth1Backend, + CachingEth1Backend, TEthSpec, TEventHandler, + THotStore, + TColdStore, >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, + TStoreMigrator: Migrate + 'static, TSlotClock: SlotClock + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, @@ -572,12 +620,8 @@ where .log .as_ref() .ok_or_else(|| "dummy_eth1_backend requires a log".to_string())?; - let store = self - .store - .clone() - .ok_or_else(|| "dummy_eth1_backend requires a store.".to_string())?; - let backend = CachingEth1Backend::new(Eth1Config::default(), log.clone(), store); + let backend = CachingEth1Backend::new(Eth1Config::default(), log.clone()); let mut eth1_chain = Eth1Chain::new(backend); eth1_chain.use_dummy_backend = true; @@ -588,14 +632,23 @@ where } } -impl +impl BeaconChainBuilder< - Witness, + Witness< + TStoreMigrator, + TestingSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, + TStoreMigrator: Migrate + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, { @@ -620,22 +673,24 @@ where } } -impl +impl BeaconChainBuilder< Witness< - TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, NullEventHandler, + THotStore, + TColdStore, >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, + TStoreMigrator: Migrate + 'static, TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, { /// Sets the `BeaconChain` event handler to `NullEventHandler`. @@ -665,12 +720,14 @@ fn genesis_block( #[cfg(test)] mod test { use super::*; - use crate::migrate::{MemoryStore, NullMigrator}; + use crate::migrate::NullMigrator; use eth2_hashing::hash; use genesis::{generate_deterministic_keypairs, interop_genesis_state}; use sloggers::{null::NullLoggerBuilder, Build}; use ssz::Encode; use std::time::Duration; + use store::config::StoreConfig; + use store::{HotColdDB, MemoryStore}; use tempfile::tempdir; use types::{EthSpec, MinimalEthSpec, Slot}; @@ -687,7 +744,12 @@ mod test { let genesis_time = 13_371_337; let log = get_logger(); - let store = Arc::new(MemoryStore::open()); + let store: HotColdDB< + MinimalEthSpec, + MemoryStore, + MemoryStore, + > = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log.clone()) + .unwrap(); let spec = MinimalEthSpec::default_spec(); let data_dir = tempdir().expect("should create temporary data_dir"); @@ -700,7 +762,7 @@ mod test { let chain = BeaconChainBuilder::new(MinimalEthSpec) .logger(log.clone()) - .store(store) + .store(Arc::new(store)) .store_migrator(NullMigrator) .data_dir(data_dir.path().to_path_buf()) .genesis_state(genesis_state) diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index 213344413..f6573a2db 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -10,8 +10,7 @@ use std::cmp::Ordering; use std::collections::HashMap; use std::iter::DoubleEndedIterator; use std::marker::PhantomData; -use std::sync::Arc; -use store::{DBColumn, Error as StoreError, Store, StoreItem}; +use store::{DBColumn, Error as StoreError, StoreItem}; use types::{ BeaconState, BeaconStateError, ChainSpec, Deposit, Eth1Data, EthSpec, Hash256, Slot, Unsigned, DEPOSIT_TREE_DEPTH, @@ -75,24 +74,22 @@ impl StoreItem for SszEth1 { } /// Holds an `Eth1ChainBackend` and serves requests from the `BeaconChain`. -pub struct Eth1Chain +pub struct Eth1Chain where - T: Eth1ChainBackend, + T: Eth1ChainBackend, E: EthSpec, - S: Store, { backend: T, /// When `true`, the backend will be ignored and dummy data from the 2019 Canada interop method /// will be used instead. pub use_dummy_backend: bool, - _phantom: PhantomData<(E, S)>, + _phantom: PhantomData, } -impl Eth1Chain +impl Eth1Chain where - T: Eth1ChainBackend, + T: Eth1ChainBackend, E: EthSpec, - S: Store, { pub fn new(backend: T) -> Self { Self { @@ -110,7 +107,7 @@ where spec: &ChainSpec, ) -> Result { if self.use_dummy_backend { - let dummy_backend: DummyEth1ChainBackend = DummyEth1ChainBackend::default(); + let dummy_backend: DummyEth1ChainBackend = DummyEth1ChainBackend::default(); dummy_backend.eth1_data(state, spec) } else { self.backend.eth1_data(state, spec) @@ -132,7 +129,7 @@ where spec: &ChainSpec, ) -> Result, Error> { if self.use_dummy_backend { - let dummy_backend: DummyEth1ChainBackend = DummyEth1ChainBackend::default(); + let dummy_backend: DummyEth1ChainBackend = DummyEth1ChainBackend::default(); dummy_backend.queued_deposits(state, eth1_data_vote, spec) } else { self.backend.queued_deposits(state, eth1_data_vote, spec) @@ -145,11 +142,10 @@ where pub fn from_ssz_container( ssz_container: &SszEth1, config: Eth1Config, - store: Arc, log: &Logger, ) -> Result { let backend = - Eth1ChainBackend::from_bytes(&ssz_container.backend_bytes, config, store, log.clone())?; + Eth1ChainBackend::from_bytes(&ssz_container.backend_bytes, config, log.clone())?; Ok(Self { use_dummy_backend: ssz_container.use_dummy_backend, backend, @@ -171,7 +167,7 @@ where } } -pub trait Eth1ChainBackend>: Sized + Send + Sync { +pub trait Eth1ChainBackend: Sized + Send + Sync { /// Returns the `Eth1Data` that should be included in a block being produced for the given /// `state`. fn eth1_data(&self, beacon_state: &BeaconState, spec: &ChainSpec) @@ -195,12 +191,7 @@ pub trait Eth1ChainBackend>: Sized + Send + Sync { fn as_bytes(&self) -> Vec; /// Create a `Eth1ChainBackend` instance given encoded bytes. - fn from_bytes( - bytes: &[u8], - config: Eth1Config, - store: Arc, - log: Logger, - ) -> Result; + fn from_bytes(bytes: &[u8], config: Eth1Config, log: Logger) -> Result; } /// Provides a simple, testing-only backend that generates deterministic, meaningless eth1 data. @@ -208,9 +199,9 @@ pub trait Eth1ChainBackend>: Sized + Send + Sync { /// Never creates deposits, therefore the validator set is static. /// /// This was used in the 2019 Canada interop workshops. -pub struct DummyEth1ChainBackend>(PhantomData<(T, S)>); +pub struct DummyEth1ChainBackend(PhantomData); -impl> Eth1ChainBackend for DummyEth1ChainBackend { +impl Eth1ChainBackend for DummyEth1ChainBackend { /// Produce some deterministic junk based upon the current epoch. fn eth1_data(&self, state: &BeaconState, _spec: &ChainSpec) -> Result { let current_epoch = state.current_epoch(); @@ -243,17 +234,12 @@ impl> Eth1ChainBackend for DummyEth1ChainBackend, - _log: Logger, - ) -> Result { + fn from_bytes(_bytes: &[u8], _config: Eth1Config, _log: Logger) -> Result { Ok(Self(PhantomData)) } } -impl> Default for DummyEth1ChainBackend { +impl Default for DummyEth1ChainBackend { fn default() -> Self { Self(PhantomData) } @@ -265,21 +251,19 @@ impl> Default for DummyEth1ChainBackend { /// The `core` connects to some external eth1 client (e.g., Parity/Geth) and polls it for /// information. #[derive(Clone)] -pub struct CachingEth1Backend { +pub struct CachingEth1Backend { pub core: HttpService, - store: Arc, log: Logger, _phantom: PhantomData, } -impl> CachingEth1Backend { +impl CachingEth1Backend { /// Instantiates `self` with empty caches. /// /// Does not connect to the eth1 node or start any tasks to keep the cache updated. - pub fn new(config: Eth1Config, log: Logger, store: Arc) -> Self { + pub fn new(config: Eth1Config, log: Logger) -> Self { Self { core: HttpService::new(config, log.clone()), - store, log, _phantom: PhantomData, } @@ -291,17 +275,16 @@ impl> CachingEth1Backend { } /// Instantiates `self` from an existing service. - pub fn from_service(service: HttpService, store: Arc) -> Self { + pub fn from_service(service: HttpService) -> Self { Self { log: service.log.clone(), core: service, - store, _phantom: PhantomData, } } } -impl> Eth1ChainBackend for CachingEth1Backend { +impl Eth1ChainBackend for CachingEth1Backend { fn eth1_data(&self, state: &BeaconState, spec: &ChainSpec) -> Result { let period = T::SlotsPerEth1VotingPeriod::to_u64(); let voting_period_start_slot = (state.slot / period) * period; @@ -406,16 +389,10 @@ impl> Eth1ChainBackend for CachingEth1Backend, - log: Logger, - ) -> Result { + fn from_bytes(bytes: &[u8], config: Eth1Config, log: Logger) -> Result { let inner = HttpService::from_bytes(bytes, config, log.clone())?; Ok(Self { core: inner, - store, log, _phantom: PhantomData, }) @@ -572,17 +549,15 @@ mod test { mod eth1_chain_json_backend { use super::*; use eth1::DepositLog; - use store::MemoryStore; use types::test_utils::{generate_deterministic_keypair, TestingDepositBuilder}; - fn get_eth1_chain() -> Eth1Chain>, E, MemoryStore> { + fn get_eth1_chain() -> Eth1Chain, E> { let eth1_config = Eth1Config { ..Eth1Config::default() }; let log = null_logger().unwrap(); - let store = Arc::new(MemoryStore::open()); - Eth1Chain::new(CachingEth1Backend::new(eth1_config, log, store)) + Eth1Chain::new(CachingEth1Backend::new(eth1_config, log)) } fn get_deposit_log(i: u64, spec: &ChainSpec) -> DepositLog { diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index dc92744a5..7291a3ffc 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -9,14 +9,16 @@ use std::sync::Arc; use std::thread; use store::hot_cold_store::{process_finalization, HotColdDBError}; use store::iter::{ParentRootBlockIterator, RootsIterator}; -use store::{Error, Store, StoreOp}; +use store::{Error, ItemStore, StoreOp}; pub use store::{HotColdDB, MemoryStore}; use types::*; use types::{BeaconState, EthSpec, Hash256, Slot}; /// Trait for migration processes that update the database upon finalization. -pub trait Migrate: Send + Sync + 'static { - fn new(db: Arc>, log: Logger) -> Self; +pub trait Migrate, Cold: ItemStore>: + Send + Sync + 'static +{ + fn new(db: Arc>, log: Logger) -> Self; fn process_finalization( &self, @@ -30,13 +32,13 @@ pub trait Migrate: Send + Sync + 'static { } /// Traverses live heads and prunes blocks and states of chains that we know can't be built - /// upon because finalization would prohibit it. This is a optimisation intended to save disk + /// upon because finalization would prohibit it. This is an optimisation intended to save disk /// space. /// /// Assumptions: /// * It is called after every finalization. fn prune_abandoned_forks( - store: Arc>, + store: Arc>, head_tracker: Arc, old_finalized_block_hash: SignedBeaconBlockHash, new_finalized_block_hash: SignedBeaconBlockHash, @@ -171,8 +173,8 @@ pub trait Migrate: Send + Sync + 'static { /// Migrator that does nothing, for stores that don't need migration. pub struct NullMigrator; -impl Migrate for NullMigrator { - fn new(_: Arc>, _: Logger) -> Self { +impl, Cold: ItemStore> Migrate for NullMigrator { + fn new(_: Arc>, _: Logger) -> Self { NullMigrator } } @@ -180,12 +182,14 @@ impl Migrate for NullMigrator { /// Migrator that immediately calls the store's migration function, blocking the current execution. /// /// Mostly useful for tests. -pub struct BlockingMigrator { - db: Arc>, +pub struct BlockingMigrator, Cold: ItemStore> { + db: Arc>, } -impl Migrate for BlockingMigrator { - fn new(db: Arc>, _: Logger) -> Self { +impl, Cold: ItemStore> Migrate + for BlockingMigrator +{ + fn new(db: Arc>, _: Logger) -> Self { BlockingMigrator { db } } @@ -225,14 +229,16 @@ type MpscSender = mpsc::Sender<( )>; /// Migrator that runs a background thread to migrate state from the hot to the cold database. -pub struct BackgroundMigrator { - db: Arc>, +pub struct BackgroundMigrator, Cold: ItemStore> { + db: Arc>, tx_thread: Mutex<(MpscSender, thread::JoinHandle<()>)>, log: Logger, } -impl Migrate for BackgroundMigrator { - fn new(db: Arc>, log: Logger) -> Self { +impl, Cold: ItemStore> Migrate + for BackgroundMigrator +{ + fn new(db: Arc>, log: Logger) -> Self { let tx_thread = Mutex::new(Self::spawn_thread(db.clone(), log.clone())); Self { db, tx_thread, log } } @@ -283,7 +289,7 @@ impl Migrate for BackgroundMigrator { } } -impl BackgroundMigrator { +impl, Cold: ItemStore> BackgroundMigrator { /// Return true if a migration needs to be performed, given a new `finalized_slot`. fn needs_migration(&self, finalized_slot: Slot, max_finality_distance: u64) -> bool { let finality_distance = finalized_slot - self.db.get_split_slot(); @@ -294,7 +300,7 @@ impl BackgroundMigrator { /// /// Return a channel handle for sending new finalized states to the thread. fn spawn_thread( - db: Arc>, + db: Arc>, log: Logger, ) -> ( mpsc::Sender<( diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 752a56486..0565f8c4e 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -18,7 +18,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use store::{HotColdDB, MemoryStore, Store}; +use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore}; use tempfile::{tempdir, TempDir}; use tree_hash::TreeHash; use types::{ @@ -34,17 +34,19 @@ pub const HARNESS_GENESIS_TIME: u64 = 1_567_552_690; // This parameter is required by a builder but not used because we use the `TestingSlotClock`. pub const HARNESS_SLOT_TIME: Duration = Duration::from_secs(1); -pub type BaseHarnessType = Witness< - TStore, +pub type BaseHarnessType = Witness< TStoreMigrator, TestingSlotClock, - CachingEth1Backend, + CachingEth1Backend, TEthSpec, NullEventHandler, + THotStore, + TColdStore, >; -pub type HarnessType = BaseHarnessType, NullMigrator, E>; -pub type DiskHarnessType = BaseHarnessType, BlockingMigrator, E>; +pub type HarnessType = BaseHarnessType, MemoryStore>; +pub type DiskHarnessType = + BaseHarnessType, LevelDB>, E, LevelDB, LevelDB>; /// Indicates how the `BeaconChainHarness` should produce blocks. #[derive(Clone, Copy, Debug)] @@ -84,12 +86,12 @@ pub struct BeaconChainHarness { impl BeaconChainHarness> { /// Instantiate a new harness with `validator_count` initial validators. - pub fn new(eth_spec_instance: E, keypairs: Vec) -> Self { + pub fn new(eth_spec_instance: E, keypairs: Vec, config: StoreConfig) -> Self { // Setting the target aggregators to really high means that _all_ validators in the // committee are required to produce an aggregate. This is overkill, however with small // validator counts it's the only way to be certain there is _at least one_ aggregator per // committee. - Self::new_with_target_aggregators(eth_spec_instance, keypairs, 1 << 32) + Self::new_with_target_aggregators(eth_spec_instance, keypairs, 1 << 32, config) } /// Instantiate a new harness with `validator_count` initial validators and a custom @@ -98,6 +100,7 @@ impl BeaconChainHarness> { eth_spec_instance: E, keypairs: Vec, target_aggregators_per_committee: u64, + config: StoreConfig, ) -> Self { let data_dir = tempdir().expect("should create temporary data_dir"); let mut spec = E::default_spec(); @@ -105,11 +108,11 @@ impl BeaconChainHarness> { spec.target_aggregators_per_committee = target_aggregators_per_committee; let log = NullLoggerBuilder.build().expect("logger should build"); - + let store = HotColdDB::open_ephemeral(config, spec.clone(), log.clone()).unwrap(); let chain = BeaconChainBuilder::new(eth_spec_instance) - .logger(log.clone()) + .logger(log) .custom_spec(spec.clone()) - .store(Arc::new(MemoryStore::open())) + .store(Arc::new(store)) .store_migrator(NullMigrator) .data_dir(data_dir.path().to_path_buf()) .genesis_state( @@ -140,7 +143,7 @@ impl BeaconChainHarness> { /// Instantiate a new harness with `validator_count` initial validators. pub fn new_with_disk_store( eth_spec_instance: E, - store: Arc>, + store: Arc, LevelDB>>, keypairs: Vec, ) -> Self { let data_dir = tempdir().expect("should create temporary data_dir"); @@ -180,7 +183,7 @@ impl BeaconChainHarness> { /// Instantiate a new harness with `validator_count` initial validators. pub fn resume_from_disk_store( eth_spec_instance: E, - store: Arc>, + store: Arc, LevelDB>>, keypairs: Vec, data_dir: TempDir, ) -> Self { @@ -192,7 +195,10 @@ impl BeaconChainHarness> { .logger(log.clone()) .custom_spec(spec) .store(store.clone()) - .store_migrator( as Migrate>::new(store, log.clone())) + .store_migrator( as Migrate>::new( + store, + log.clone(), + )) .data_dir(data_dir.path().to_path_buf()) .resume_from_db() .expect("should resume beacon chain from db") @@ -215,11 +221,12 @@ impl BeaconChainHarness> { } } -impl BeaconChainHarness> +impl BeaconChainHarness> where - S: Store, - M: Migrate, + M: Migrate, E: EthSpec, + Hot: ItemStore, + Cold: ItemStore, { /// Advance the slot of the `BeaconChain`. /// diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 1258c1ca6..af5a0ce47 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -7,6 +7,7 @@ use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}, StateSkipConfig, }; +use store::config::StoreConfig; use tree_hash::TreeHash; use types::{AggregateSignature, EthSpec, Keypair, MainnetEthSpec, RelativeEpoch, Slot}; @@ -25,7 +26,11 @@ lazy_static! { fn produces_attestations() { let num_blocks_produced = MainnetEthSpec::slots_per_epoch() * 4; - let harness = BeaconChainHarness::new(MainnetEthSpec, KEYPAIRS[..].to_vec()); + let harness = BeaconChainHarness::new( + MainnetEthSpec, + KEYPAIRS[..].to_vec(), + StoreConfig::default(), + ); // Skip past the genesis slot. harness.advance_slot(); diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 7a06ca65b..7e0e2e27a 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -9,7 +9,7 @@ use beacon_chain::{ BeaconChain, BeaconChainTypes, }; use state_processing::per_slot_processing; -use store::Store; +use store::config::StoreConfig; use tree_hash::TreeHash; use types::{ test_utils::generate_deterministic_keypair, AggregateSignature, Attestation, EthSpec, Hash256, @@ -36,6 +36,7 @@ fn get_harness(validator_count: usize) -> BeaconChainHarness> { // A kind-of arbitrary number that ensures that _some_ validators are aggregators, but // not all. 4, + StoreConfig::default(), ); harness.advance_slot(); diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 9dae527c8..16546404b 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -7,6 +7,7 @@ use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, HarnessType}, BeaconSnapshot, BlockError, }; +use store::config::StoreConfig; use types::{ test_utils::generate_deterministic_keypair, AggregateSignature, AttestationData, AttesterSlashing, Checkpoint, Deposit, DepositData, Epoch, EthSpec, Hash256, @@ -47,7 +48,11 @@ fn get_chain_segment() -> Vec> { } fn get_harness(validator_count: usize) -> BeaconChainHarness> { - let harness = BeaconChainHarness::new(MainnetEthSpec, KEYPAIRS[0..validator_count].to_vec()); + let harness = BeaconChainHarness::new( + MainnetEthSpec, + KEYPAIRS[0..validator_count].to_vec(), + StoreConfig::default(), + ); harness.advance_slot(); diff --git a/beacon_node/beacon_chain/tests/persistence_tests.rs b/beacon_node/beacon_chain/tests/persistence_tests.rs index 95670c55a..19efc506c 100644 --- a/beacon_node/beacon_chain/tests/persistence_tests.rs +++ b/beacon_node/beacon_chain/tests/persistence_tests.rs @@ -9,7 +9,7 @@ use beacon_chain::{ }; use sloggers::{null::NullLoggerBuilder, Build}; use std::sync::Arc; -use store::{HotColdDB, StoreConfig}; +use store::{HotColdDB, LevelDB, StoreConfig}; use tempfile::{tempdir, TempDir}; use types::{EthSpec, Keypair, MinimalEthSpec}; @@ -23,7 +23,7 @@ lazy_static! { static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); } -fn get_store(db_path: &TempDir) -> Arc> { +fn get_store(db_path: &TempDir) -> Arc, LevelDB>> { let spec = E::default_spec(); let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 6c2f828ff..89a183792 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use store::{ iter::{BlockRootsIterator, StateRootsIterator}, - HotColdDB, Store, StoreConfig, + HotColdDB, LevelDB, StoreConfig, }; use tempfile::{tempdir, TempDir}; use tree_hash::TreeHash; @@ -35,7 +35,7 @@ lazy_static! { type E = MinimalEthSpec; type TestHarness = BeaconChainHarness>; -fn get_store(db_path: &TempDir) -> Arc> { +fn get_store(db_path: &TempDir) -> Arc, LevelDB>> { let spec = MinimalEthSpec::default_spec(); let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); @@ -47,7 +47,10 @@ fn get_store(db_path: &TempDir) -> Arc> { ) } -fn get_harness(store: Arc>, validator_count: usize) -> TestHarness { +fn get_harness( + store: Arc, LevelDB>>, + validator_count: usize, +) -> TestHarness { let harness = BeaconChainHarness::new_with_disk_store( MinimalEthSpec, store, @@ -1310,7 +1313,7 @@ fn check_finalization(harness: &TestHarness, expected_slot: u64) { } /// Check that the HotColdDB's split_slot is equal to the start slot of the last finalized epoch. -fn check_split_slot(harness: &TestHarness, store: Arc>) { +fn check_split_slot(harness: &TestHarness, store: Arc, LevelDB>>) { let split_slot = store.get_split_slot(); assert_eq!( harness @@ -1361,7 +1364,7 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { .collect::>(); let head = harness.chain.head().expect("should get head"); - let mut forward_block_roots = Store::forwards_block_roots_iterator( + let mut forward_block_roots = HotColdDB::forwards_block_roots_iterator( harness.chain.store.clone(), Slot::new(0), head.beacon_state, diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 5d5c75731..5eadb13f7 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -13,7 +13,7 @@ use operation_pool::PersistedOperationPool; use state_processing::{ per_slot_processing, per_slot_processing::Error as SlotProcessingError, EpochProcessingError, }; -use store::Store; +use store::config::StoreConfig; use types::{BeaconStateError, EthSpec, Hash256, Keypair, MinimalEthSpec, RelativeEpoch, Slot}; // Should ideally be divisible by 3. @@ -25,7 +25,11 @@ lazy_static! { } fn get_harness(validator_count: usize) -> BeaconChainHarness> { - let harness = BeaconChainHarness::new(MinimalEthSpec, KEYPAIRS[0..validator_count].to_vec()); + let harness = BeaconChainHarness::new( + MinimalEthSpec, + KEYPAIRS[0..validator_count].to_vec(), + StoreConfig::default(), + ); harness.advance_slot(); diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 7c29e0f76..41c6bc3a4 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -5,9 +5,9 @@ use beacon_chain::events::TeeEventHandler; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::{CachingEth1Backend, Eth1Chain}, - migrate::{BackgroundMigrator, Migrate, NullMigrator}, + migrate::{BackgroundMigrator, Migrate}, slot_clock::{SlotClock, SystemTimeSlotClock}, - store::{HotColdDB, MemoryStore, Store, StoreConfig}, + store::{HotColdDB, ItemStore, LevelDB, StoreConfig}, BeaconChain, BeaconChainTypes, Eth1ChainBackend, EventHandler, }; use bus::Bus; @@ -50,7 +50,7 @@ pub const ETH1_GENESIS_UPDATE_INTERVAL_MILLIS: u64 = 7_000; /// `self.memory_store(..)` has been called. pub struct ClientBuilder { slot_clock: Option, - store: Option>, + store: Option>>, store_migrator: Option, runtime_context: Option>, chain_spec: Option, @@ -65,17 +65,26 @@ pub struct ClientBuilder { eth_spec_instance: T::EthSpec, } -impl +impl ClientBuilder< - Witness, + Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate, + TStoreMigrator: Migrate, TSlotClock: SlotClock + Clone + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Instantiates a new, empty builder. /// @@ -350,8 +359,17 @@ where /// If type inference errors are being raised, see the comment on the definition of `Self`. pub fn build( self, - ) -> Client> - { + ) -> Client< + Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, + > { Client { beacon_chain: self.beacon_chain, network_globals: self.network_globals, @@ -361,17 +379,26 @@ where } } -impl +impl ClientBuilder< - Witness, + Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate, + TStoreMigrator: Migrate, TSlotClock: SlotClock + Clone + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Consumes the internal `BeaconChainBuilder`, attaching the resulting `BeaconChain` to self. pub fn build_beacon_chain(mut self) -> Result { @@ -401,23 +428,25 @@ where } } -impl +impl ClientBuilder< Witness< - TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, WebSocketSender, + THotStore, + TColdStore, >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate, + TStoreMigrator: Migrate, TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Specifies that the `BeaconChain` should publish events using the WebSocket server. pub fn websocket_event_handler(mut self, config: WebSocketConfig) -> Result { @@ -442,23 +471,25 @@ where } } -impl +impl ClientBuilder< Witness< - TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, TeeEventHandler, + THotStore, + TColdStore, >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate, + TStoreMigrator: Migrate, TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Specifies that the `BeaconChain` should publish events using the WebSocket server. pub fn tee_event_handler( @@ -490,18 +521,19 @@ where impl ClientBuilder< Witness< - HotColdDB, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, TEventHandler, + LevelDB, + LevelDB, >, > where TSlotClock: SlotClock + 'static, - TStoreMigrator: Migrate + 'static, - TEth1Backend: Eth1ChainBackend> + 'static, + TStoreMigrator: Migrate, LevelDB> + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, { @@ -529,50 +561,25 @@ where } } -impl +impl ClientBuilder< Witness< - MemoryStore, - NullMigrator, + BackgroundMigrator, TSlotClock, TEth1Backend, TEthSpec, TEventHandler, + THotStore, + TColdStore, >, > where TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend> + 'static, - TEthSpec: EthSpec + 'static, - TEventHandler: EventHandler + 'static, -{ - /// Specifies that the `Client` should use a `MemoryStore` database. - /// - /// Also sets the `store_migrator` to the `NullMigrator`, as that's the only viable choice. - pub fn memory_store(mut self) -> Self { - let store = MemoryStore::open(); - self.store = Some(Arc::new(store)); - self.store_migrator = Some(NullMigrator); - self - } -} - -impl - ClientBuilder< - Witness< - HotColdDB, - BackgroundMigrator, - TSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - >, - > -where - TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend> + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { pub fn background_migrator(mut self) -> Result { let context = self @@ -588,23 +595,25 @@ where } } -impl +impl ClientBuilder< Witness< - TStore, TStoreMigrator, TSlotClock, - CachingEth1Backend, + CachingEth1Backend, TEthSpec, TEventHandler, + THotStore, + TColdStore, >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate, + TStoreMigrator: Migrate, TSlotClock: SlotClock + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Specifies that the `BeaconChain` should cache eth1 blocks/logs from a remote eth1 node /// (e.g., Parity/Geth) and refer to that cache when collecting deposits or eth1 votes during @@ -618,10 +627,6 @@ where let beacon_chain_builder = self .beacon_chain_builder .ok_or_else(|| "caching_eth1_backend requires a beacon_chain_builder")?; - let store = self - .store - .clone() - .ok_or_else(|| "caching_eth1_backend requires a store".to_string())?; let backend = if let Some(eth1_service_from_genesis) = self.eth1_service { eth1_service_from_genesis.update_config(config)?; @@ -636,7 +641,7 @@ where // adding earlier blocks too. eth1_service_from_genesis.drop_block_cache(); - CachingEth1Backend::from_service(eth1_service_from_genesis, store) + CachingEth1Backend::from_service(eth1_service_from_genesis) } else { beacon_chain_builder .get_persisted_eth1_backend()? @@ -644,18 +649,11 @@ where Eth1Chain::from_ssz_container( &persisted, config.clone(), - store.clone(), &context.log().clone(), ) .map(|chain| chain.into_backend()) }) - .unwrap_or_else(|| { - Ok(CachingEth1Backend::new( - config, - context.log().clone(), - store, - )) - })? + .unwrap_or_else(|| Ok(CachingEth1Backend::new(config, context.log().clone())))? }; self.eth1_service = None; @@ -699,16 +697,25 @@ where } } -impl +impl ClientBuilder< - Witness, + Witness< + TStoreMigrator, + SystemTimeSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate, - TEth1Backend: Eth1ChainBackend + 'static, + TStoreMigrator: Migrate, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Specifies that the slot clock should read the time from the computers system clock. pub fn system_time_slot_clock(mut self) -> Result { diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index 568a551c9..3bc6dbba2 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -17,19 +17,21 @@ mod tests { use sloggers::{null::NullLoggerBuilder, Build}; use slot_clock::{SlotClock, SystemTimeSlotClock}; use std::time::{Duration, SystemTime}; - use store::MemoryStore; + use store::config::StoreConfig; + use store::{HotColdDB, MemoryStore}; use tempfile::tempdir; use types::{CommitteeIndex, EnrForkId, EthSpec, MinimalEthSpec}; const SLOT_DURATION_MILLIS: u64 = 200; type TestBeaconChainType = Witness< - MemoryStore, NullMigrator, SystemTimeSlotClock, - CachingEth1Backend>, + CachingEth1Backend, MinimalEthSpec, NullEventHandler, + MemoryStore, + MemoryStore, >; pub struct TestBeaconChain { @@ -44,11 +46,14 @@ mod tests { let keypairs = generate_deterministic_keypairs(1); let log = get_logger(); + let store = + HotColdDB::open_ephemeral(StoreConfig::default(), spec.clone(), log.clone()) + .unwrap(); let chain = Arc::new( BeaconChainBuilder::new(MinimalEthSpec) .logger(log.clone()) .custom_spec(spec.clone()) - .store(Arc::new(MemoryStore::open())) + .store(Arc::new(store)) .store_migrator(NullMigrator) .data_dir(data_dir.path().to_path_buf()) .genesis_state( @@ -85,7 +90,7 @@ mod tests { } lazy_static! { - static ref CHAIN: TestBeaconChain = { TestBeaconChain::new_with_system_clock() }; + static ref CHAIN: TestBeaconChain = TestBeaconChain::new_with_system_clock(); } fn get_attestation_service() -> AttestationService { diff --git a/beacon_node/network/src/persisted_dht.rs b/beacon_node/network/src/persisted_dht.rs index c0c811259..1f68f7ac1 100644 --- a/beacon_node/network/src/persisted_dht.rs +++ b/beacon_node/network/src/persisted_dht.rs @@ -1,13 +1,15 @@ use eth2_libp2p::Enr; use rlp; use std::sync::Arc; -use store::{DBColumn, Error as StoreError, Store, StoreItem}; +use store::{DBColumn, Error as StoreError, HotColdDB, ItemStore, StoreItem}; use types::{EthSpec, Hash256}; /// 32-byte key for accessing the `DhtEnrs`. pub const DHT_DB_KEY: &str = "PERSISTEDDHTPERSISTEDDHTPERSISTE"; -pub fn load_dht, E: EthSpec>(store: Arc) -> Vec { +pub fn load_dht, Cold: ItemStore>( + store: Arc>, +) -> Vec { // Load DHT from store let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); match store.get_item(&key) { @@ -20,8 +22,8 @@ pub fn load_dht, E: EthSpec>(store: Arc) -> Vec { } /// Attempt to persist the ENR's in the DHT to `self.store`. -pub fn persist_dht, E: EthSpec>( - store: Arc, +pub fn persist_dht, Cold: ItemStore>( + store: Arc>, enrs: Vec, ) -> Result<(), store::Error> { let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); @@ -56,14 +58,19 @@ impl StoreItem for PersistedDht { mod tests { use super::*; use eth2_libp2p::Enr; + use sloggers::{null::NullLoggerBuilder, Build}; use std::str::FromStr; - use std::sync::Arc; - use store::{MemoryStore, Store}; - use types::Hash256; - use types::MinimalEthSpec; + use store::config::StoreConfig; + use store::{HotColdDB, MemoryStore}; + use types::{ChainSpec, Hash256, MinimalEthSpec}; #[test] fn test_persisted_dht() { - let store = Arc::new(MemoryStore::::open()); + let log = NullLoggerBuilder.build().unwrap(); + let store: HotColdDB< + MinimalEthSpec, + MemoryStore, + MemoryStore, + > = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log).unwrap(); let enrs = vec![Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap()]; let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); store diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 4a50eddb0..8cbab2120 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -13,7 +13,6 @@ use itertools::process_results; use slog::{debug, error, o, trace, warn}; use ssz::Encode; use std::sync::Arc; -use store::Store; use tokio::sync::mpsc; use types::{ Attestation, ChainSpec, Epoch, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index efabaa23b..edd480f79 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -17,6 +17,7 @@ use rest_types::ValidatorSubscription; use slog::{debug, error, info, o, trace}; use std::sync::Arc; use std::time::Duration; +use store::HotColdDB; use tokio::sync::mpsc; use tokio::time::Delay; use types::EthSpec; @@ -40,7 +41,7 @@ pub struct NetworkService { /// lighthouse. router_send: mpsc::UnboundedSender>, /// A reference to lighthouse's database to persist the DHT. - store: Arc, + store: Arc>, /// A collection of global variables, accessible outside of the network service. network_globals: Arc>, /// A delay that expires when a new fork takes place. @@ -78,7 +79,7 @@ impl NetworkService { let (network_globals, mut libp2p) = LibP2PService::new(executor.clone(), config, enr_fork_id, &network_log)?; - for enr in load_dht::(store.clone()) { + for enr in load_dht::(store.clone()) { libp2p.swarm.add_enr(enr); } @@ -142,7 +143,7 @@ fn spawn_service( "Number of peers" => format!("{}", enrs.len()), ); - match persist_dht::(service.store.clone(), enrs) { + match persist_dht::(service.store.clone(), enrs) { Err(e) => error!( service.log, "Failed to persist DHT on drop"; diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 48aa038c8..b27933924 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -9,6 +9,7 @@ mod tests { use sloggers::{null::NullLoggerBuilder, Build}; use std::str::FromStr; use std::sync::Arc; + use store::config::StoreConfig; use tokio::runtime::Runtime; use types::{test_utils::generate_deterministic_keypairs, MinimalEthSpec}; @@ -22,7 +23,12 @@ mod tests { let log = get_logger(); let beacon_chain = Arc::new( - BeaconChainHarness::new(MinimalEthSpec, generate_deterministic_keypairs(8)).chain, + BeaconChainHarness::new( + MinimalEthSpec, + generate_deterministic_keypairs(8), + StoreConfig::default(), + ) + .chain, ); let store = beacon_chain.store.clone(); diff --git a/beacon_node/rest_api/src/beacon.rs b/beacon_node/rest_api/src/beacon.rs index f0040cfef..fc0ed8a5a 100644 --- a/beacon_node/rest_api/src/beacon.rs +++ b/beacon_node/rest_api/src/beacon.rs @@ -13,7 +13,6 @@ use rest_types::{ }; use std::io::Write; use std::sync::Arc; -use store::Store; use slog::{error, Logger}; use types::{ diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index a8a2d5af7..872e7c834 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -8,7 +8,7 @@ use hyper::{Body, Request}; use itertools::process_results; use network::NetworkMessage; use ssz::Decode; -use store::{iter::AncestorIter, Store}; +use store::iter::AncestorIter; use types::{ BeaconState, CommitteeIndex, Epoch, EthSpec, Hash256, RelativeEpoch, SignedBeaconBlock, Slot, }; diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 67bd28327..d8459b3c3 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -11,7 +11,8 @@ pub use config::{get_data_dir, get_eth2_testnet_config, get_testnet_dir}; pub use eth2_config::Eth2Config; use beacon_chain::events::TeeEventHandler; -use beacon_chain::migrate::{BackgroundMigrator, HotColdDB}; +use beacon_chain::migrate::BackgroundMigrator; +use beacon_chain::store::LevelDB; use beacon_chain::{ builder::Witness, eth1_chain::CachingEth1Backend, slot_clock::SystemTimeSlotClock, }; @@ -25,12 +26,13 @@ use types::EthSpec; /// A type-alias to the tighten the definition of a production-intended `Client`. pub type ProductionClient = Client< Witness< - HotColdDB, - BackgroundMigrator, + BackgroundMigrator, LevelDB>, SystemTimeSlotClock, - CachingEth1Backend>, + CachingEth1Backend, E, TeeEventHandler, + LevelDB, + LevelDB, >, >; diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index e2fb94564..79d25ecfb 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -29,3 +29,4 @@ serde_derive = "1.0.110" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } lru = "0.5.1" +sloggers = "1.0.0" diff --git a/beacon_node/store/src/chunked_iter.rs b/beacon_node/store/src/chunked_iter.rs index 979acd6ea..632b3e017 100644 --- a/beacon_node/store/src/chunked_iter.rs +++ b/beacon_node/store/src/chunked_iter.rs @@ -1,5 +1,5 @@ use crate::chunked_vector::{chunk_key, Chunk, Field}; -use crate::HotColdDB; +use crate::{HotColdDB, ItemStore}; use slog::error; use std::sync::Arc; use types::{ChainSpec, EthSpec, Slot}; @@ -7,22 +7,26 @@ use types::{ChainSpec, EthSpec, Slot}; /// Iterator over the values of a `BeaconState` vector field (like `block_roots`). /// /// Uses the freezer DB's separate table to load the values. -pub struct ChunkedVectorIter +pub struct ChunkedVectorIter where F: Field, E: EthSpec, + Hot: ItemStore, + Cold: ItemStore, { - pub(crate) store: Arc>, + pub(crate) store: Arc>, current_vindex: usize, pub(crate) end_vindex: usize, next_cindex: usize, current_chunk: Chunk, } -impl ChunkedVectorIter +impl ChunkedVectorIter where F: Field, E: EthSpec, + Hot: ItemStore, + Cold: ItemStore, { /// Create a new iterator which can yield elements from `start_vindex` up to the last /// index stored by the restore point at `last_restore_point_slot`. @@ -31,7 +35,7 @@ where /// `HotColdDB::get_latest_restore_point_slot`. We pass it as a parameter so that the caller can /// maintain a stable view of the database (see `HybridForwardsBlockRootsIterator`). pub fn new( - store: Arc>, + store: Arc>, start_vindex: usize, last_restore_point_slot: Slot, spec: &ChainSpec, @@ -53,10 +57,12 @@ where } } -impl Iterator for ChunkedVectorIter +impl Iterator for ChunkedVectorIter where F: Field, E: EthSpec, + Hot: ItemStore, + Cold: ItemStore, { type Item = (usize, F::Value); diff --git a/beacon_node/store/src/forwards_iter.rs b/beacon_node/store/src/forwards_iter.rs index 2971eeafe..6ae38aa85 100644 --- a/beacon_node/store/src/forwards_iter.rs +++ b/beacon_node/store/src/forwards_iter.rs @@ -2,14 +2,14 @@ use crate::chunked_iter::ChunkedVectorIter; use crate::chunked_vector::BlockRoots; use crate::errors::{Error, Result}; use crate::iter::BlockRootsIterator; -use crate::{HotColdDB, Store}; +use crate::{HotColdDB, ItemStore}; use itertools::process_results; use std::sync::Arc; use types::{BeaconState, ChainSpec, EthSpec, Hash256, Slot}; /// Forwards block roots iterator that makes use of the `block_roots` table in the freezer DB. -pub struct FrozenForwardsBlockRootsIterator { - inner: ChunkedVectorIter, +pub struct FrozenForwardsBlockRootsIterator, Cold: ItemStore> { + inner: ChunkedVectorIter, } /// Forwards block roots iterator that reverses a backwards iterator (only good for short ranges). @@ -19,9 +19,9 @@ pub struct SimpleForwardsBlockRootsIterator { } /// Fusion of the above two approaches to forwards iteration. Fast and efficient. -pub enum HybridForwardsBlockRootsIterator { +pub enum HybridForwardsBlockRootsIterator, Cold: ItemStore> { PreFinalization { - iter: Box>, + iter: Box>, /// Data required by the `PostFinalization` iterator when we get to it. continuation_data: Box, Hash256)>>, }, @@ -30,9 +30,11 @@ pub enum HybridForwardsBlockRootsIterator { }, } -impl FrozenForwardsBlockRootsIterator { +impl, Cold: ItemStore> + FrozenForwardsBlockRootsIterator +{ pub fn new( - store: Arc>, + store: Arc>, start_slot: Slot, last_restore_point_slot: Slot, spec: &ChainSpec, @@ -48,7 +50,9 @@ impl FrozenForwardsBlockRootsIterator { } } -impl Iterator for FrozenForwardsBlockRootsIterator { +impl, Cold: ItemStore> Iterator + for FrozenForwardsBlockRootsIterator +{ type Item = (Hash256, Slot); fn next(&mut self) -> Option { @@ -59,8 +63,8 @@ impl Iterator for FrozenForwardsBlockRootsIterator { } impl SimpleForwardsBlockRootsIterator { - pub fn new, E: EthSpec>( - store: Arc, + pub fn new, Cold: ItemStore>( + store: Arc>, start_slot: Slot, end_state: BeaconState, end_block_root: Hash256, @@ -87,9 +91,11 @@ impl Iterator for SimpleForwardsBlockRootsIterator { } } -impl HybridForwardsBlockRootsIterator { +impl, Cold: ItemStore> + HybridForwardsBlockRootsIterator +{ pub fn new( - store: Arc>, + store: Arc>, start_slot: Slot, end_state: BeaconState, end_block_root: Hash256, @@ -157,7 +163,9 @@ impl HybridForwardsBlockRootsIterator { } } -impl Iterator for HybridForwardsBlockRootsIterator { +impl, Cold: ItemStore> Iterator + for HybridForwardsBlockRootsIterator +{ type Item = Result<(Hash256, Slot)>; fn next(&mut self) -> Option { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index dfe98985b..8ec6c3453 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -5,10 +5,12 @@ use crate::config::StoreConfig; use crate::forwards_iter::HybridForwardsBlockRootsIterator; use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; +use crate::leveldb_store::LevelDB; +use crate::memory_store::MemoryStore; use crate::metrics; use crate::{ - leveldb_store::LevelDB, DBColumn, Error, ItemStore, KeyValueStore, PartialBeaconState, Store, - StoreItem, StoreOp, + get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, + StoreOp, }; use lru::LruCache; use parking_lot::{Mutex, RwLock}; @@ -32,7 +34,7 @@ pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE"; /// /// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores /// intermittent "restore point" states pre-finalization. -pub struct HotColdDB { +pub struct HotColdDB, Cold: ItemStore> { /// The slot and state root at the point where the database is split between hot and cold. /// /// States with slots less than `split.slot` are in the cold DB, while states with slots @@ -40,11 +42,11 @@ pub struct HotColdDB { split: RwLock, config: StoreConfig, /// Cold database containing compact historical data. - pub(crate) cold_db: LevelDB, + pub(crate) cold_db: Cold, /// Hot database containing duplicated but quick-to-access recent data. /// /// The hot database also contains all blocks. - pub(crate) hot_db: LevelDB, + pub(crate) hot_db: Hot, /// LRU cache of deserialized blocks. Updated whenever a block is loaded. block_cache: Mutex>>, /// Chain spec. @@ -84,11 +86,13 @@ pub enum HotColdDBError { RestorePointBlockHashError(BeaconStateError), } -impl Store for HotColdDB { - type ForwardsBlockRootsIterator = HybridForwardsBlockRootsIterator; - +impl, Cold: ItemStore> HotColdDB { /// Store a block and update the LRU cache. - fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock) -> Result<(), Error> { + pub fn put_block( + &self, + block_root: &Hash256, + block: SignedBeaconBlock, + ) -> Result<(), Error> { // Store on disk. self.hot_db.put(block_root, &block)?; @@ -99,7 +103,7 @@ impl Store for HotColdDB { } /// Fetch a block from the store. - fn get_block(&self, block_root: &Hash256) -> Result>, Error> { + pub fn get_block(&self, block_root: &Hash256) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT); // Check the cache. @@ -120,12 +124,12 @@ impl Store for HotColdDB { } /// Delete a block from the store and the block cache. - fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> { + pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> { self.block_cache.lock().pop(block_root); self.hot_db.delete::>(block_root) } - fn put_state_summary( + pub fn put_state_summary( &self, state_root: &Hash256, summary: HotStateSummary, @@ -134,7 +138,7 @@ impl Store for HotColdDB { } /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { + pub fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { if state.slot < self.get_split_slot() { self.store_cold_state(state_root, &state) } else { @@ -143,7 +147,7 @@ impl Store for HotColdDB { } /// Fetch a state from the store. - fn get_state( + pub fn get_state( &self, state_root: &Hash256, slot: Option, @@ -170,7 +174,7 @@ impl Store for HotColdDB { /// than the split point. You shouldn't delete states from the finalized portion of the chain /// (which are frozen, and won't be deleted), or valid descendents of the finalized checkpoint /// (which will be deleted by this function but shouldn't be). - fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> { + pub fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> { // Delete the state summary. self.hot_db .key_delete(DBColumn::BeaconStateSummary.into(), state_root.as_bytes())?; @@ -184,20 +188,20 @@ impl Store for HotColdDB { Ok(()) } - fn forwards_block_roots_iterator( + pub fn forwards_block_roots_iterator( store: Arc, start_slot: Slot, end_state: BeaconState, end_block_root: Hash256, spec: &ChainSpec, - ) -> Result { + ) -> Result>, Error> { HybridForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root, spec) } /// Load an epoch boundary state by using the hot state summary look-up. /// /// Will fall back to the cold DB if a hot state summary is not found. - fn load_epoch_boundary_state( + pub fn load_epoch_boundary_state( &self, state_root: &Hash256, ) -> Result>, Error> { @@ -226,21 +230,49 @@ impl Store for HotColdDB { } } - fn put_item(&self, key: &Hash256, item: &I) -> Result<(), Error> { + pub fn put_item(&self, key: &Hash256, item: &I) -> Result<(), Error> { self.hot_db.put(key, item) } - fn get_item(&self, key: &Hash256) -> Result, Error> { + pub fn get_item(&self, key: &Hash256) -> Result, Error> { self.hot_db.get(key) } - fn item_exists(&self, key: &Hash256) -> Result { + pub fn item_exists(&self, key: &Hash256) -> Result { self.hot_db.exists::(key) } - fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { + pub fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { let mut guard = self.block_cache.lock(); - self.hot_db.do_atomically(batch)?; + + let mut key_value_batch: Vec = Vec::with_capacity(batch.len()); + for op in batch { + match op { + StoreOp::DeleteBlock(block_hash) => { + let untyped_hash: Hash256 = (*block_hash).into(); + let key = + get_key_for_col(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes()); + key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + } + + StoreOp::DeleteState(state_hash, slot) => { + let untyped_hash: Hash256 = (*state_hash).into(); + let state_summary_key = get_key_for_col( + DBColumn::BeaconStateSummary.into(), + untyped_hash.as_bytes(), + ); + key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key)); + + if *slot % E::slots_per_epoch() == 0 { + let state_key = + get_key_for_col(DBColumn::BeaconState.into(), untyped_hash.as_bytes()); + key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key)); + } + } + } + } + self.hot_db.do_atomically(&key_value_batch)?; + for op in batch { match op { StoreOp::DeleteBlock(block_hash) => { @@ -254,7 +286,30 @@ impl Store for HotColdDB { } } -impl HotColdDB { +impl HotColdDB, MemoryStore> { + pub fn open_ephemeral( + config: StoreConfig, + spec: ChainSpec, + log: Logger, + ) -> Result, MemoryStore>, Error> { + Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; + + let db = HotColdDB { + split: RwLock::new(Split::default()), + cold_db: MemoryStore::open(), + hot_db: MemoryStore::open(), + block_cache: Mutex::new(LruCache::new(config.block_cache_size)), + config, + spec, + log, + _phantom: PhantomData, + }; + + Ok(db) + } +} + +impl HotColdDB, LevelDB> { /// Open a new or existing database, with the given paths to the hot and cold DBs. /// /// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`. @@ -264,7 +319,7 @@ impl HotColdDB { config: StoreConfig, spec: ChainSpec, log: Logger, - ) -> Result { + ) -> Result, LevelDB>, Error> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; let db = HotColdDB { @@ -285,7 +340,9 @@ impl HotColdDB { } Ok(db) } +} +impl, Cold: ItemStore> HotColdDB { /// Store a post-finalization state efficiently in the hot database. /// /// On an epoch boundary, store a full state. On an intermediate slot, store @@ -675,8 +732,8 @@ impl HotColdDB { } /// Advance the split point of the store, moving new finalized states to the freezer. -pub fn process_finalization( - store: Arc>, +pub fn process_finalization, Cold: ItemStore>( + store: Arc>, frozen_head_root: Hash256, frozen_head: &BeaconState, ) -> Result<(), Error> { diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 0dcb69bff..5a97a3917 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -1,4 +1,4 @@ -use crate::{Error, Store}; +use crate::{Error, HotColdDB, ItemStore}; use std::borrow::Cow; use std::marker::PhantomData; use std::sync::Arc; @@ -12,17 +12,20 @@ use types::{ /// /// It is assumed that all ancestors for this object are stored in the database. If this is not the /// case, the iterator will start returning `None` prior to genesis. -pub trait AncestorIter, E: EthSpec, I: Iterator> { +pub trait AncestorIter, Cold: ItemStore, I: Iterator> { /// Returns an iterator over the roots of the ancestors of `self`. - fn try_iter_ancestor_roots(&self, store: Arc) -> Option; + fn try_iter_ancestor_roots(&self, store: Arc>) -> Option; } -impl<'a, U: Store, E: EthSpec> AncestorIter> - for SignedBeaconBlock +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> + AncestorIter> for SignedBeaconBlock { /// Iterates across all available prior block roots of `self`, starting at the most recent and ending /// at genesis. - fn try_iter_ancestor_roots(&self, store: Arc) -> Option> { + fn try_iter_ancestor_roots( + &self, + store: Arc>, + ) -> Option> { let state = store .get_state(&self.message.state_root, Some(self.message.slot)) .ok()??; @@ -31,22 +34,27 @@ impl<'a, U: Store, E: EthSpec> AncestorIter, E: EthSpec> AncestorIter> - for BeaconState +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> + AncestorIter> for BeaconState { /// Iterates across all available prior state roots of `self`, starting at the most recent and ending /// at genesis. - fn try_iter_ancestor_roots(&self, store: Arc) -> Option> { + fn try_iter_ancestor_roots( + &self, + store: Arc>, + ) -> Option> { // The `self.clone()` here is wasteful. Some(StateRootsIterator::owned(store, self.clone())) } } -pub struct StateRootsIterator<'a, T: EthSpec, U: Store> { - inner: RootsIterator<'a, T, U>, +pub struct StateRootsIterator<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> { + inner: RootsIterator<'a, T, Hot, Cold>, } -impl<'a, T: EthSpec, U: Store> Clone for StateRootsIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Clone + for StateRootsIterator<'a, T, Hot, Cold> +{ fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -54,21 +62,23 @@ impl<'a, T: EthSpec, U: Store> Clone for StateRootsIterator<'a, T, U> { } } -impl<'a, T: EthSpec, U: Store> StateRootsIterator<'a, T, U> { - pub fn new(store: Arc, beacon_state: &'a BeaconState) -> Self { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> StateRootsIterator<'a, T, Hot, Cold> { + pub fn new(store: Arc>, beacon_state: &'a BeaconState) -> Self { Self { inner: RootsIterator::new(store, beacon_state), } } - pub fn owned(store: Arc, beacon_state: BeaconState) -> Self { + pub fn owned(store: Arc>, beacon_state: BeaconState) -> Self { Self { inner: RootsIterator::owned(store, beacon_state), } } } -impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator + for StateRootsIterator<'a, T, Hot, Cold> +{ type Item = Result<(Hash256, Slot), Error>; fn next(&mut self) -> Option { @@ -86,11 +96,13 @@ impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> { /// exhausted. /// /// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. -pub struct BlockRootsIterator<'a, T: EthSpec, U: Store> { - inner: RootsIterator<'a, T, U>, +pub struct BlockRootsIterator<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> { + inner: RootsIterator<'a, T, Hot, Cold>, } -impl<'a, T: EthSpec, U: Store> Clone for BlockRootsIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Clone + for BlockRootsIterator<'a, T, Hot, Cold> +{ fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -98,23 +110,25 @@ impl<'a, T: EthSpec, U: Store> Clone for BlockRootsIterator<'a, T, U> { } } -impl<'a, T: EthSpec, U: Store> BlockRootsIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockRootsIterator<'a, T, Hot, Cold> { /// Create a new iterator over all block roots in the given `beacon_state` and prior states. - pub fn new(store: Arc, beacon_state: &'a BeaconState) -> Self { + pub fn new(store: Arc>, beacon_state: &'a BeaconState) -> Self { Self { inner: RootsIterator::new(store, beacon_state), } } /// Create a new iterator over all block roots in the given `beacon_state` and prior states. - pub fn owned(store: Arc, beacon_state: BeaconState) -> Self { + pub fn owned(store: Arc>, beacon_state: BeaconState) -> Self { Self { inner: RootsIterator::owned(store, beacon_state), } } } -impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator + for BlockRootsIterator<'a, T, Hot, Cold> +{ type Item = Result<(Hash256, Slot), Error>; fn next(&mut self) -> Option { @@ -125,13 +139,15 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { } /// Iterator over state and block roots that backtracks using the vectors from a `BeaconState`. -pub struct RootsIterator<'a, T: EthSpec, U: Store> { - store: Arc, +pub struct RootsIterator<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> { + store: Arc>, beacon_state: Cow<'a, BeaconState>, slot: Slot, } -impl<'a, T: EthSpec, U: Store> Clone for RootsIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Clone + for RootsIterator<'a, T, Hot, Cold> +{ fn clone(&self) -> Self { Self { store: self.store.clone(), @@ -141,8 +157,8 @@ impl<'a, T: EthSpec, U: Store> Clone for RootsIterator<'a, T, U> { } } -impl<'a, T: EthSpec, U: Store> RootsIterator<'a, T, U> { - pub fn new(store: Arc, beacon_state: &'a BeaconState) -> Self { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> RootsIterator<'a, T, Hot, Cold> { + pub fn new(store: Arc>, beacon_state: &'a BeaconState) -> Self { Self { store, slot: beacon_state.slot, @@ -150,7 +166,7 @@ impl<'a, T: EthSpec, U: Store> RootsIterator<'a, T, U> { } } - pub fn owned(store: Arc, beacon_state: BeaconState) -> Self { + pub fn owned(store: Arc>, beacon_state: BeaconState) -> Self { Self { store, slot: beacon_state.slot, @@ -158,7 +174,10 @@ impl<'a, T: EthSpec, U: Store> RootsIterator<'a, T, U> { } } - pub fn from_block(store: Arc, block_hash: Hash256) -> Result { + pub fn from_block( + store: Arc>, + block_hash: Hash256, + ) -> Result { let block = store .get_block(&block_hash)? .ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?; @@ -198,7 +217,9 @@ impl<'a, T: EthSpec, U: Store> RootsIterator<'a, T, U> { } } -impl<'a, T: EthSpec, U: Store> Iterator for RootsIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator + for RootsIterator<'a, T, Hot, Cold> +{ /// (block_root, state_root, slot) type Item = Result<(Hash256, Hash256, Slot), Error>; @@ -208,14 +229,16 @@ impl<'a, T: EthSpec, U: Store> Iterator for RootsIterator<'a, T, U> { } /// Block iterator that uses the `parent_root` of each block to backtrack. -pub struct ParentRootBlockIterator<'a, E: EthSpec, S: Store> { - store: &'a S, +pub struct ParentRootBlockIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { + store: &'a HotColdDB, next_block_root: Hash256, _phantom: PhantomData, } -impl<'a, E: EthSpec, S: Store> ParentRootBlockIterator<'a, E, S> { - pub fn new(store: &'a S, start_block_root: Hash256) -> Self { +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> + ParentRootBlockIterator<'a, E, Hot, Cold> +{ + pub fn new(store: &'a HotColdDB, start_block_root: Hash256) -> Self { Self { store, next_block_root: start_block_root, @@ -240,7 +263,9 @@ impl<'a, E: EthSpec, S: Store> ParentRootBlockIterator<'a, E, S> { } } -impl<'a, E: EthSpec, S: Store> Iterator for ParentRootBlockIterator<'a, E, S> { +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator + for ParentRootBlockIterator<'a, E, Hot, Cold> +{ type Item = Result<(Hash256, SignedBeaconBlock), Error>; fn next(&mut self) -> Option { @@ -250,20 +275,20 @@ impl<'a, E: EthSpec, S: Store> Iterator for ParentRootBlockIterator<'a, E, S> #[derive(Clone)] /// Extends `BlockRootsIterator`, returning `SignedBeaconBlock` instances, instead of their roots. -pub struct BlockIterator<'a, T: EthSpec, U: Store> { - roots: BlockRootsIterator<'a, T, U>, +pub struct BlockIterator<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> { + roots: BlockRootsIterator<'a, T, Hot, Cold>, } -impl<'a, T: EthSpec, U: Store> BlockIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockIterator<'a, T, Hot, Cold> { /// Create a new iterator over all blocks in the given `beacon_state` and prior states. - pub fn new(store: Arc, beacon_state: &'a BeaconState) -> Self { + pub fn new(store: Arc>, beacon_state: &'a BeaconState) -> Self { Self { roots: BlockRootsIterator::new(store, beacon_state), } } /// Create a new iterator over all blocks in the given `beacon_state` and prior states. - pub fn owned(store: Arc, beacon_state: BeaconState) -> Self { + pub fn owned(store: Arc>, beacon_state: BeaconState) -> Self { Self { roots: BlockRootsIterator::owned(store, beacon_state), } @@ -279,7 +304,9 @@ impl<'a, T: EthSpec, U: Store> BlockIterator<'a, T, U> { } } -impl<'a, T: EthSpec, U: Store> Iterator for BlockIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator + for BlockIterator<'a, T, Hot, Cold> +{ type Item = Result, Error>; fn next(&mut self) -> Option { @@ -288,8 +315,8 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockIterator<'a, T, U> { } /// Fetch the next state to use whilst backtracking in `*RootsIterator`. -fn next_historical_root_backtrack_state>( - store: &S, +fn next_historical_root_backtrack_state, Cold: ItemStore>( + store: &HotColdDB, current_state: &BeaconState, ) -> Result, Error> { // For compatibility with the freezer database's restore points, we load a state at @@ -312,8 +339,10 @@ fn slot_of_prev_restore_point(current_slot: Slot) -> Slot { #[cfg(test)] mod test { use super::*; - use crate::MemoryStore; - use types::{test_utils::TestingBeaconStateBuilder, Keypair, MainnetEthSpec}; + use crate::config::StoreConfig; + use crate::HotColdDB; + use sloggers::{null::NullLoggerBuilder, Build}; + use types::{test_utils::TestingBeaconStateBuilder, ChainSpec, Keypair, MainnetEthSpec}; fn get_state() -> BeaconState { let builder = TestingBeaconStateBuilder::from_single_keypair( @@ -327,7 +356,10 @@ mod test { #[test] fn block_root_iter() { - let store = Arc::new(MemoryStore::open()); + let log = NullLoggerBuilder.build().unwrap(); + let store = Arc::new( + HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log).unwrap(), + ); let slots_per_historical_root = MainnetEthSpec::slots_per_historical_root(); let mut state_a: BeaconState = get_state(); @@ -371,7 +403,10 @@ mod test { #[test] fn state_root_iter() { - let store = Arc::new(MemoryStore::open()); + let log = NullLoggerBuilder.build().unwrap(); + let store = Arc::new( + HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log).unwrap(), + ); let slots_per_historical_root = MainnetEthSpec::slots_per_historical_root(); let mut state_a: BeaconState = get_state(); diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 3a7d55889..6e3dc8bbe 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -48,7 +48,7 @@ impl KeyValueStore for LevelDB { let timer = metrics::start_timer(&metrics::DISK_DB_READ_TIMES); self.db - .get(self.read_options(), column_key) + .get(self.read_options(), BytesKey::from_vec(column_key)) .map_err(Into::into) .map(|opt| { opt.map(|bytes| { @@ -68,7 +68,7 @@ impl KeyValueStore for LevelDB { let timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES); self.db - .put(self.write_options(), column_key, val) + .put(self.write_options(), BytesKey::from_vec(column_key), val) .map_err(Into::into) .map(|()| { metrics::stop_timer(timer); @@ -82,7 +82,7 @@ impl KeyValueStore for LevelDB { metrics::inc_counter(&metrics::DISK_DB_EXISTS_COUNT); self.db - .get(self.read_options(), column_key) + .get(self.read_options(), BytesKey::from_vec(column_key)) .map_err(Into::into) .and_then(|val| Ok(val.is_some())) } @@ -94,34 +94,16 @@ impl KeyValueStore for LevelDB { metrics::inc_counter(&metrics::DISK_DB_DELETE_COUNT); self.db - .delete(self.write_options(), column_key) + .delete(self.write_options(), BytesKey::from_vec(column_key)) .map_err(Into::into) } - fn do_atomically(&self, ops_batch: &[StoreOp]) -> Result<(), Error> { + fn do_atomically(&self, ops_batch: &[KeyValueStoreOp]) -> Result<(), Error> { let mut leveldb_batch = Writebatch::new(); - for op in ops_batch { + for op in ops_batch.into_iter() { match op { - StoreOp::DeleteBlock(block_hash) => { - let untyped_hash: Hash256 = (*block_hash).into(); - let key = - get_key_for_col(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes()); - leveldb_batch.delete(key); - } - - StoreOp::DeleteState(state_hash, slot) => { - let untyped_hash: Hash256 = (*state_hash).into(); - let state_summary_key = get_key_for_col( - DBColumn::BeaconStateSummary.into(), - untyped_hash.as_bytes(), - ); - leveldb_batch.delete(state_summary_key); - - if *slot % E::slots_per_epoch() == 0 { - let state_key = - get_key_for_col(DBColumn::BeaconState.into(), untyped_hash.as_bytes()); - leveldb_batch.delete(state_key); - } + KeyValueStoreOp::DeleteKey(key) => { + leveldb_batch.delete(BytesKey::from_vec(key.to_vec())); } } } @@ -147,10 +129,10 @@ impl Key for BytesKey { } } -fn get_key_for_col(col: &str, key: &[u8]) -> BytesKey { - let mut col = col.as_bytes().to_vec(); - col.append(&mut key.to_vec()); - BytesKey { key: col } +impl BytesKey { + fn from_vec(key: Vec) -> Self { + Self { key } + } } impl From for Error { diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 3abdae423..1d0506ce3 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -25,8 +25,6 @@ mod state_batch; pub mod iter; -use std::sync::Arc; - pub use self::config::StoreConfig; pub use self::hot_cold_store::{HotColdDB, HotStateSummary}; pub use self::leveldb_store::LevelDB; @@ -52,7 +50,17 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error>; /// Execute either all of the operations in `batch` or none at all, returning an error. - fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error>; + fn do_atomically(&self, batch: &[KeyValueStoreOp]) -> Result<(), Error>; +} + +pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { + let mut result = column.as_bytes().to_vec(); + result.extend_from_slice(key); + result +} + +pub enum KeyValueStoreOp { + DeleteKey(Vec), } pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'static { @@ -93,75 +101,6 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati } } -/// An object capable of storing and retrieving objects implementing `StoreItem`. -/// -/// A `Store` is fundamentally backed by a key-value database, however it provides support for -/// columns. A simple column implementation might involve prefixing a key with some bytes unique to -/// each column. -pub trait Store: Sync + Send + Sized + 'static { - type ForwardsBlockRootsIterator: Iterator>; - - /// Store a block in the store. - fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock) -> Result<(), Error>; - - /// Fetch a block from the store. - fn get_block(&self, block_root: &Hash256) -> Result>, Error>; - - /// Delete a block from the store. - fn delete_block(&self, block_root: &Hash256) -> Result<(), Error>; - - /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error>; - - /// Store a state summary in the store. - fn put_state_summary( - &self, - state_root: &Hash256, - summary: HotStateSummary, - ) -> Result<(), Error>; - - /// Fetch a state from the store. - fn get_state( - &self, - state_root: &Hash256, - slot: Option, - ) -> Result>, Error>; - - /// Delete a state from the store. - fn delete_state(&self, state_root: &Hash256, _slot: Slot) -> Result<(), Error>; - - /// Get a forwards (slot-ascending) iterator over the beacon block roots since `start_slot`. - /// - /// Will be efficient for frozen portions of the database if using `HotColdDB`. - /// - /// The `end_state` and `end_block_root` are required for backtracking in the post-finalization - /// part of the chain, and should be usually be set to the current head. Importantly, the - /// `end_state` must be a state that has had a block applied to it, and the hash of that - /// block must be `end_block_root`. - // NOTE: could maybe optimise by getting the `BeaconState` and end block root from a closure, as - // it's not always required. - fn forwards_block_roots_iterator( - store: Arc, - start_slot: Slot, - end_state: BeaconState, - end_block_root: Hash256, - spec: &ChainSpec, - ) -> Result; - - fn load_epoch_boundary_state( - &self, - state_root: &Hash256, - ) -> Result>, Error>; - - fn put_item(&self, key: &Hash256, item: &I) -> Result<(), Error>; - - fn get_item(&self, key: &Hash256) -> Result, Error>; - - fn item_exists(&self, key: &Hash256) -> Result; - - fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error>; -} - /// Reified key-value storage operation. Helps in modifying the storage atomically. /// See also https://github.com/sigp/lighthouse/issues/692 pub enum StoreOp { diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 3465e385b..c8556860c 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,12 +1,7 @@ -use super::{DBColumn, Error, ItemStore, KeyValueStore, Store, StoreOp}; -use crate::forwards_iter::SimpleForwardsBlockRootsIterator; -use crate::hot_cold_store::HotStateSummary; -use crate::impls::beacon_state::{get_full_state, store_full_state}; -use crate::StoreItem; +use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp}; use parking_lot::RwLock; use std::collections::HashMap; use std::marker::PhantomData; -use std::sync::Arc; use types::*; type DBHashMap = HashMap, Vec>; @@ -46,53 +41,34 @@ impl KeyValueStore for MemoryStore { /// Get the value of some key from the database. Returns `None` if the key does not exist. fn get_bytes(&self, col: &str, key: &[u8]) -> Result>, Error> { let column_key = Self::get_key_for_col(col, key); - Ok(self.db.read().get(&column_key).cloned()) } /// Puts a key in the database. fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { let column_key = Self::get_key_for_col(col, key); - self.db.write().insert(column_key, val.to_vec()); - Ok(()) } /// Return true if some key exists in some column. fn key_exists(&self, col: &str, key: &[u8]) -> Result { let column_key = Self::get_key_for_col(col, key); - Ok(self.db.read().contains_key(&column_key)) } /// Delete some key from the database. fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> { let column_key = Self::get_key_for_col(col, key); - self.db.write().remove(&column_key); - Ok(()) } - fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { + fn do_atomically(&self, batch: &[KeyValueStoreOp]) -> Result<(), Error> { for op in batch { match op { - StoreOp::DeleteBlock(block_hash) => { - let untyped_hash: Hash256 = (*block_hash).into(); - self.key_delete(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes())?; - } - - StoreOp::DeleteState(state_hash, slot) => { - let untyped_hash: Hash256 = (*state_hash).into(); - if *slot % E::slots_per_epoch() == 0 { - self.key_delete(DBColumn::BeaconState.into(), untyped_hash.as_bytes())?; - } else { - self.key_delete( - DBColumn::BeaconStateSummary.into(), - untyped_hash.as_bytes(), - )?; - } + KeyValueStoreOp::DeleteKey(hash) => { + self.db.write().remove(hash); } } } @@ -101,94 +77,3 @@ impl KeyValueStore for MemoryStore { } impl ItemStore for MemoryStore {} - -impl Store for MemoryStore { - type ForwardsBlockRootsIterator = SimpleForwardsBlockRootsIterator; - - fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock) -> Result<(), Error> { - self.put(block_root, &block) - } - - fn get_block(&self, block_root: &Hash256) -> Result>, Error> { - self.get(block_root) - } - - fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> { - self.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes()) - } - - fn put_state_summary( - &self, - state_root: &Hash256, - summary: HotStateSummary, - ) -> Result<(), Error> { - self.put(state_root, &summary).map_err(Into::into) - } - - /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { - store_full_state(self, state_root, &state) - } - - /// Fetch a state from the store. - fn get_state( - &self, - state_root: &Hash256, - _: Option, - ) -> Result>, Error> { - get_full_state(self, state_root) - } - - fn delete_state(&self, state_root: &Hash256, _slot: Slot) -> Result<(), Error> { - self.key_delete(DBColumn::BeaconState.into(), state_root.as_bytes()) - } - - fn forwards_block_roots_iterator( - store: Arc, - start_slot: Slot, - end_state: BeaconState, - end_block_root: Hash256, - _: &ChainSpec, - ) -> Result { - SimpleForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root) - } - - /// Load the most recent ancestor state of `state_root` which lies on an epoch boundary. - /// - /// If `state_root` corresponds to an epoch boundary state, then that state itself should be - /// returned. - fn load_epoch_boundary_state( - &self, - state_root: &Hash256, - ) -> Result>, Error> { - // The default implementation is not very efficient, but isn't used in prod. - // See `HotColdDB` for the optimized implementation. - if let Some(state) = self.get_state(state_root, None)? { - let epoch_boundary_slot = state.slot / E::slots_per_epoch() * E::slots_per_epoch(); - if state.slot == epoch_boundary_slot { - Ok(Some(state)) - } else { - let epoch_boundary_state_root = state.get_state_root(epoch_boundary_slot)?; - self.get_state(epoch_boundary_state_root, Some(epoch_boundary_slot)) - } - } else { - Ok(None) - } - } - - fn put_item(&self, key: &Hash256, item: &I) -> Result<(), Error> { - self.put(key, item) - } - - fn get_item(&self, key: &Hash256) -> Result, Error> { - self.get(key) - } - - fn item_exists(&self, key: &Hash256) -> Result { - self.exists::(key) - } - - fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { - KeyValueStore::do_atomically(self, batch) - } -} diff --git a/beacon_node/store/src/state_batch.rs b/beacon_node/store/src/state_batch.rs index c19173861..ab2d060c1 100644 --- a/beacon_node/store/src/state_batch.rs +++ b/beacon_node/store/src/state_batch.rs @@ -1,4 +1,4 @@ -use crate::{Error, HotStateSummary, Store}; +use crate::{Error, HotColdDB, HotStateSummary, ItemStore}; use types::{BeaconState, EthSpec, Hash256}; /// A collection of states to be stored in the database. @@ -36,7 +36,10 @@ impl StateBatch { /// Write the batch to the database. /// /// May fail to write the full batch if any of the items error (i.e. not atomic!) - pub fn commit>(self, store: &S) -> Result<(), Error> { + pub fn commit, Cold: ItemStore>( + self, + store: &HotColdDB, + ) -> Result<(), Error> { self.items.into_iter().try_for_each(|item| match item { BatchItem::Full(state_root, state) => store.put_state(&state_root, &state), BatchItem::Summary(state_root, summary) => {