diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 628ccf9a4..34554afaa 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -121,3 +121,14 @@ jobs: - uses: actions/checkout@v1 - name: Run cargo audit to identify known security vulnerabilities reported to the RustSec Advisory Database run: make audit + cargo-udeps: + runs-on: ubuntu-latest + needs: cargo-fmt + steps: + - uses: actions/checkout@v1 + - name: Get latest version of nightly Rust + run: rustup update nightly + - name: Install cargo-udeps + run: cargo install cargo-udeps --locked + - name: Run cargo udeps to identify unused crates in the dependency graph + run: make udeps diff --git a/Cargo.lock b/Cargo.lock index fb6c2a6b1..51e44bc46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,10 +24,8 @@ dependencies = [ "slog", "slog-async", "slog-term", - "tempdir", "tokio 0.2.21", "types", - "validator_client", "validator_dir", "web3", ] @@ -144,12 +142,6 @@ dependencies = [ "syn", ] -[[package]] -name = "assert_approx_eq" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c07dab4369547dbe5114677b33fbbf724971019f3818172d59a97a61c774ffd" - [[package]] name = "assert_matches" version = "1.3.0" @@ -256,6 +248,7 @@ dependencies = [ "futures 0.3.5", "genesis", "integer-sqrt", + "itertools 0.9.0", "lazy_static", "lighthouse_metrics", "log 0.4.8", @@ -293,7 +286,6 @@ dependencies = [ "client", "ctrlc", "dirs", - "env_logger", "environment", "eth2-libp2p", "eth2_config", @@ -316,16 +308,6 @@ dependencies = [ "version", ] -[[package]] -name = "bincode" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5753e2a71534719bf3f4e57006c3a4f0d2c672a4b676eec84161f763eca87dbf" -dependencies = [ - "byteorder", - "serde", -] - [[package]] name = "bitflags" version = "0.9.1" @@ -408,7 +390,6 @@ dependencies = [ "arbitrary", "eth2_hashing", "eth2_ssz", - "eth2_ssz_types", "hex 0.4.2", "milagro_bls", "rand 0.7.3", @@ -944,17 +925,6 @@ dependencies = [ "types", ] -[[package]] -name = "derivative" -version = "2.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb582b60359da160a9477ee80f15c8d784c477e69c217ef2cdd4169c24ea380f" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "derive_arbitrary" version = "0.4.4" @@ -1084,7 +1054,6 @@ dependencies = [ "tree_hash", "tree_hash_derive", "types", - "walkdir", ] [[package]] @@ -1138,10 +1107,7 @@ dependencies = [ name = "environment" version = "0.1.2" dependencies = [ - "beacon_node", - "clap", "ctrlc", - "env_logger", "eth2_config", "eth2_testnet_config", "exit-future", @@ -1345,7 +1311,6 @@ dependencies = [ "serde", "serde_derive", "serde_hex", - "serde_yaml", "tree_hash", "tree_hash_derive", "typenum", @@ -2238,7 +2203,6 @@ dependencies = [ "deposit_contract", "dirs", "environment", - "eth1_test_rig", "eth2-libp2p", "eth2_keystore", "eth2_ssz", @@ -2916,7 +2880,6 @@ dependencies = [ name = "network" version = "0.1.2" dependencies = [ - "assert_approx_eq", "beacon_chain", "environment", "error-chain", @@ -2929,6 +2892,7 @@ dependencies = [ "genesis", "hashset_delay", "hex 0.4.2", + "itertools 0.9.0", "lazy_static", "lighthouse_metrics", "matches", @@ -3434,7 +3398,6 @@ version = "0.2.0" dependencies = [ "eth2_ssz", "eth2_ssz_derive", - "itertools 0.9.0", "serde", "serde_derive", "serde_yaml", @@ -3799,6 +3762,7 @@ dependencies = [ "hex 0.4.2", "http 0.2.1", "hyper 0.13.6", + "itertools 0.9.0", "lazy_static", "lighthouse_metrics", "network", @@ -4276,7 +4240,7 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" name = "slashing_protection" version = "0.1.0" dependencies = [ - "parking_lot 0.9.0", + "parking_lot 0.10.2", "r2d2", "r2d2_sqlite", "rayon", @@ -4503,7 +4467,6 @@ name = "state_processing" version = "0.2.0" dependencies = [ "arbitrary", - "beacon_chain", "bls", "criterion", "env_logger", @@ -4521,7 +4484,6 @@ dependencies = [ "serde", "serde_derive", "serde_yaml", - "store", "tree_hash", "tree_hash_derive", "types", @@ -4651,8 +4613,6 @@ dependencies = [ "criterion", "eth2_hashing", "ethereum-types", - "hex 0.4.2", - "yaml-rust", ] [[package]] @@ -5295,9 +5255,7 @@ dependencies = [ "compare_fields", "compare_fields_derive", "criterion", - "derivative", "dirs", - "env_logger", "eth2_hashing", "eth2_interop_keypairs", "eth2_ssz", @@ -5465,14 +5423,12 @@ dependencies = [ name = "validator_client" version = "0.1.2" dependencies = [ - "bincode", "bls", "clap", "clap_utils", "deposit_contract", "dirs", "environment", - "error-chain", "eth2_config", "eth2_interop_keypairs", "eth2_ssz", @@ -5499,7 +5455,6 @@ dependencies = [ "tree_hash", "types", "validator_dir", - "web3", ] [[package]] @@ -5509,7 +5464,6 @@ dependencies = [ "bls", "deposit_contract", "eth2_keystore", - "eth2_wallet", "hex 0.4.2", "rand 0.7.3", "rayon", diff --git a/Makefile b/Makefile index 1f2998e0c..7f8ea3052 100644 --- a/Makefile +++ b/Makefile @@ -72,6 +72,10 @@ audit: cargo install --force cargo-audit cargo audit +# Runs `cargo udeps` to check for unused dependencies +udeps: + cargo +nightly udeps --tests --all-targets --release + # Performs a `cargo` clean and cleans the `ef_tests` directory. clean: cargo clean diff --git a/README.md b/README.md index 08106b97f..0523ec39b 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ An open-source Ethereum 2.0 client, written in Rust and maintained by Sigma Prime. -[![Build Status]][Build Link] [![Book Status]][Book Link] [![RustDoc Status]][RustDoc Link] [![Chat Badge]][Chat Link] [![Swagger Badge]][Swagger Link] +[![Build Status]][Build Link] [![Book Status]][Book Link] [![RustDoc Status]][RustDoc Link] [![Chat Badge]][Chat Link] [Build Status]: https://github.com/sigp/lighthouse/workflows/test-suite/badge.svg?branch=master [Build Link]: https://github.com/sigp/lighthouse/actions @@ -12,8 +12,6 @@ An open-source Ethereum 2.0 client, written in Rust and maintained by Sigma Prim [Book Link]: http://lighthouse-book.sigmaprime.io/ [RustDoc Status]:https://img.shields.io/badge/code--docs-master-orange [RustDoc Link]: http://lighthouse-docs.sigmaprime.io/ -[Swagger Badge]: https://img.shields.io/badge/testnet--explorer-beaconcha.in-informational -[Swagger Link]: https://lighthouse-testnet3.beaconcha.in/ [Documentation](http://lighthouse-book.sigmaprime.io/) @@ -50,9 +48,11 @@ Current development overview: - ~~**April 2019**: Inital single-client testnets.~~ - ~~**September 2019**: Inter-operability with other Ethereum 2.0 clients.~~ - ~~**Q1 2020**: `lighthouse-0.1.0` release: All major phase 0 features implemented.~~ -- **Q2 2020**: Public, multi-client testnet with user-facing functionality. -- **Q2 2020**: Third-party security review. -- **Q4 2020**: Production Beacon Chain testnet (tentative). +- ~~**Q2 2020**: Public, multi-client testnet with user-facing functionality.~~ +- ~~**Q2 2020**: Third-party security review.~~ +- **Q3 2020**: Additional third-party security reviews. +- **Q3 2020**: Long-lived, multi-client Beacon Chain testnet +- **Q4 2020**: Production Beacon Chain (tentative). ## Documentation diff --git a/account_manager/Cargo.toml b/account_manager/Cargo.toml index f53bf52a5..877a80dd4 100644 --- a/account_manager/Cargo.toml +++ b/account_manager/Cargo.toml @@ -4,9 +4,6 @@ version = "0.0.1" authors = ["Paul Hauner ", "Luke Anderson "] edition = "2018" -[dev-dependencies] -tempdir = "0.3.7" - [dependencies] bls = { path = "../crypto/bls" } clap = "2.33.0" @@ -21,7 +18,6 @@ libc = "0.2.65" eth2_ssz = "0.1.2" eth2_ssz_derive = "0.1.0" hex = "0.4.2" -validator_client = { path = "../validator_client" } rayon = "1.3.0" eth2_testnet_config = { path = "../common/eth2_testnet_config" } web3 = "0.11.0" diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index b58b1ce51..c8d3cf616 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -29,7 +29,6 @@ slog-async = "2.5.0" ctrlc = { version = "3.1.4", features = ["termination"] } tokio = { version = "0.2.21", features = ["time"] } exit-future = "0.2.0" -env_logger = "0.7.1" dirs = "2.0.2" logging = { path = "../common/logging" } futures = "0.3.5" diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 9ccb2a865..adc2d75ee 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -49,4 +49,4 @@ safe_arith = { path = "../../consensus/safe_arith" } fork_choice = { path = "../../consensus/fork_choice" } environment = { path = "../../lighthouse/environment" } bus = "2.2.3" - +itertools = "0.9.0" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1cf8563e9..9dd19ba53 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -25,6 +25,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::BeaconForkChoiceStore; use crate::BeaconSnapshot; use fork_choice::ForkChoice; +use itertools::process_results; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::RwLock; use slog::{crit, debug, error, info, trace, warn, Logger}; @@ -45,7 +46,7 @@ use std::io::prelude::*; use std::sync::Arc; use std::time::{Duration, Instant}; use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator}; -use store::{Error as DBError, Store}; +use store::{Error as DBError, HotColdDB}; use types::*; pub type ForkChoiceError = fork_choice::Error; @@ -155,10 +156,11 @@ pub struct HeadInfo { } pub trait BeaconChainTypes: Send + Sync + 'static { - type Store: store::Store; - type StoreMigrator: Migrate; + type HotStore: store::ItemStore; + type ColdStore: store::ItemStore; + type StoreMigrator: Migrate; type SlotClock: slot_clock::SlotClock; - type Eth1Chain: Eth1ChainBackend; + type Eth1Chain: Eth1ChainBackend; type EthSpec: types::EthSpec; type EventHandler: EventHandler; } @@ -168,7 +170,7 @@ pub trait BeaconChainTypes: Send + Sync + 'static { pub struct BeaconChain { pub spec: ChainSpec, /// Persistent storage for blocks, states, etc. Typically an on-disk store, such as LevelDB. - pub store: Arc, + pub store: Arc>, /// Database migrator for running background maintenance on the store. pub store_migrator: T::StoreMigrator, /// Reports the current slot, typically based upon the system clock. @@ -192,7 +194,7 @@ pub struct BeaconChain { /// Maintains a record of which validators have proposed blocks for each slot. pub observed_block_producers: ObservedBlockProducers, /// Provides information from the Ethereum 1 (PoW) chain. - pub eth1_chain: Option>, + pub eth1_chain: Option>, /// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received. pub(crate) canonical_head: TimeoutRwLock>, /// The root of the genesis block. @@ -330,27 +332,33 @@ impl BeaconChain { /// - Iterator returns `(Hash256, Slot)`. /// - As this iterator starts at the `head` of the chain (viz., the best block), the first slot /// returned may be earlier than the wall-clock slot. - pub fn rev_iter_block_roots(&self) -> Result, Error> { + pub fn rev_iter_block_roots( + &self, + ) -> Result>, Error> { let head = self.head()?; - let iter = BlockRootsIterator::owned(self.store.clone(), head.beacon_state); - - Ok(std::iter::once((head.beacon_block_root, head.beacon_block.slot())).chain(iter)) + Ok( + std::iter::once(Ok((head.beacon_block_root, head.beacon_block.slot()))) + .chain(iter) + .map(|result| result.map_err(|e| e.into())), + ) } pub fn forwards_iter_block_roots( &self, start_slot: Slot, - ) -> Result<>::ForwardsBlockRootsIterator, Error> { + ) -> Result>, Error> { let local_head = self.head()?; - Ok(T::Store::forwards_block_roots_iterator( + let iter = HotColdDB::forwards_block_roots_iterator( self.store.clone(), start_slot, local_head.beacon_state, local_head.beacon_block_root, &self.spec, - )) + )?; + + Ok(iter.map(|result| result.map_err(Into::into))) } /// Traverse backwards from `block_root` to find the block roots of its ancestors. @@ -365,7 +373,7 @@ impl BeaconChain { pub fn rev_iter_block_roots_from( &self, block_root: Hash256, - ) -> Result, Error> { + ) -> Result>, Error> { let block = self .get_block(&block_root)? .ok_or_else(|| Error::MissingBeaconBlock(block_root))?; @@ -373,7 +381,9 @@ impl BeaconChain { .get_state(&block.state_root(), Some(block.slot()))? .ok_or_else(|| Error::MissingBeaconState(block.state_root()))?; let iter = BlockRootsIterator::owned(self.store.clone(), state); - Ok(std::iter::once((block_root, block.slot())).chain(iter)) + Ok(std::iter::once(Ok((block_root, block.slot()))) + .chain(iter) + .map(|result| result.map_err(|e| e.into()))) } /// Traverse backwards from `block_root` to find the root of the ancestor block at `slot`. @@ -382,10 +392,10 @@ impl BeaconChain { block_root: Hash256, slot: Slot, ) -> Result, Error> { - Ok(self - .rev_iter_block_roots_from(block_root)? - .find(|(_, ancestor_slot)| *ancestor_slot == slot) - .map(|(ancestor_block_root, _)| ancestor_block_root)) + process_results(self.rev_iter_block_roots_from(block_root)?, |mut iter| { + iter.find(|(_, ancestor_slot)| *ancestor_slot == slot) + .map(|(ancestor_block_root, _)| ancestor_block_root) + }) } /// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to @@ -397,13 +407,16 @@ impl BeaconChain { /// - Iterator returns `(Hash256, Slot)`. /// - As this iterator starts at the `head` of the chain (viz., the best block), the first slot /// returned may be earlier than the wall-clock slot. - pub fn rev_iter_state_roots(&self) -> Result, Error> { + pub fn rev_iter_state_roots( + &self, + ) -> Result>, Error> { let head = self.head()?; let slot = head.beacon_state.slot; - let iter = StateRootsIterator::owned(self.store.clone(), head.beacon_state); - - Ok(std::iter::once((head.beacon_state_root, slot)).chain(iter)) + let iter = std::iter::once(Ok((head.beacon_state_root, slot))) + .chain(iter) + .map(|result| result.map_err(Into::into)); + Ok(iter) } /// Returns the block at the given slot, if any. Only returns blocks in the canonical chain. @@ -415,10 +428,10 @@ impl BeaconChain { &self, slot: Slot, ) -> Result>, Error> { - let root = self - .rev_iter_block_roots()? - .find(|(_, this_slot)| *this_slot == slot) - .map(|(root, _)| root); + let root = process_results(self.rev_iter_block_roots()?, |mut iter| { + iter.find(|(_, this_slot)| *this_slot == slot) + .map(|(root, _)| root) + })?; if let Some(block_root) = root { Ok(self.store.get_item(&block_root)?) @@ -564,12 +577,12 @@ impl BeaconChain { Ok(state) } Ordering::Less => { - let state_root = self - .rev_iter_state_roots()? - .take_while(|(_root, current_slot)| *current_slot >= slot) - .find(|(_root, current_slot)| *current_slot == slot) - .map(|(root, _slot)| root) - .ok_or_else(|| Error::NoStateForSlot(slot))?; + let state_root = process_results(self.rev_iter_state_roots()?, |iter| { + iter.take_while(|(_, current_slot)| *current_slot >= slot) + .find(|(_, current_slot)| *current_slot == slot) + .map(|(root, _slot)| root) + })? + .ok_or_else(|| Error::NoStateForSlot(slot))?; Ok(self .get_state(&state_root, Some(slot))? @@ -644,10 +657,10 @@ impl BeaconChain { /// /// Returns None if a block doesn't exist at the slot. pub fn root_at_slot(&self, target_slot: Slot) -> Result, Error> { - Ok(self - .rev_iter_block_roots()? - .find(|(_root, slot)| *slot == target_slot) - .map(|(root, _slot)| root)) + process_results(self.rev_iter_block_roots()?, |mut iter| { + iter.find(|(_, slot)| *slot == target_slot) + .map(|(root, _)| root) + }) } /// Returns the block proposer for a given slot. diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 1d0844953..c742d3210 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -25,7 +25,7 @@ use std::marker::PhantomData; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; -use store::Store; +use store::{HotColdDB, ItemStore}; use types::{ BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Signature, SignedBeaconBlock, Slot, }; @@ -34,28 +34,48 @@ pub const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz"; /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing /// functionality and only exists to satisfy the type system. -pub struct Witness( +pub struct Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, +>( PhantomData<( - TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, TEventHandler, + THotStore, + TColdStore, )>, ); -impl BeaconChainTypes - for Witness +impl + BeaconChainTypes + for Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + > where - TStore: Store + 'static, - TStoreMigrator: Migrate + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, + TStoreMigrator: Migrate + 'static, TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, { - type Store = TStore; + type HotStore = THotStore; + type ColdStore = TColdStore; type StoreMigrator = TStoreMigrator; type SlotClock = TSlotClock; type Eth1Chain = TEth1Backend; @@ -72,7 +92,7 @@ where /// /// See the tests for an example of a complete working example. pub struct BeaconChainBuilder { - store: Option>, + store: Option>>, store_migrator: Option, canonical_head: Option>, /// The finalized checkpoint to anchor the chain. May be genesis or a higher @@ -80,7 +100,7 @@ pub struct BeaconChainBuilder { pub finalized_snapshot: Option>, genesis_block_root: Option, op_pool: Option>, - eth1_chain: Option>, + eth1_chain: Option>, event_handler: Option, slot_clock: Option, head_tracker: Option, @@ -92,15 +112,24 @@ pub struct BeaconChainBuilder { log: Option, } -impl +impl BeaconChainBuilder< - Witness, + Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, + TStoreMigrator: Migrate + 'static, TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, { @@ -141,7 +170,7 @@ where /// Sets the store (database). /// /// Should generally be called early in the build chain. - pub fn store(mut self, store: Arc) -> Self { + pub fn store(mut self, store: Arc>) -> Self { self.store = Some(store); self } @@ -378,7 +407,15 @@ where self, ) -> Result< BeaconChain< - Witness, + Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, >, String, > { @@ -500,20 +537,22 @@ where } } -impl +impl BeaconChainBuilder< Witness< - TStore, TStoreMigrator, TSlotClock, - CachingEth1Backend, + CachingEth1Backend, TEthSpec, TEventHandler, + THotStore, + TColdStore, >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, + TStoreMigrator: Migrate + 'static, TSlotClock: SlotClock + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, @@ -529,12 +568,8 @@ where .log .as_ref() .ok_or_else(|| "dummy_eth1_backend requires a log".to_string())?; - let store = self - .store - .clone() - .ok_or_else(|| "dummy_eth1_backend requires a store.".to_string())?; - let backend = CachingEth1Backend::new(Eth1Config::default(), log.clone(), store); + let backend = CachingEth1Backend::new(Eth1Config::default(), log.clone()); let mut eth1_chain = Eth1Chain::new(backend); eth1_chain.use_dummy_backend = true; @@ -545,14 +580,23 @@ where } } -impl +impl BeaconChainBuilder< - Witness, + Witness< + TStoreMigrator, + TestingSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, + TStoreMigrator: Migrate + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, { @@ -577,22 +621,24 @@ where } } -impl +impl BeaconChainBuilder< Witness< - TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, NullEventHandler, + THotStore, + TColdStore, >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, + TStoreMigrator: Migrate + 'static, TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, { /// Sets the `BeaconChain` event handler to `NullEventHandler`. @@ -622,12 +668,14 @@ fn genesis_block( #[cfg(test)] mod test { use super::*; - use crate::migrate::{MemoryStore, NullMigrator}; + use crate::migrate::NullMigrator; use eth2_hashing::hash; use genesis::{generate_deterministic_keypairs, interop_genesis_state}; use sloggers::{null::NullLoggerBuilder, Build}; use ssz::Encode; use std::time::Duration; + use store::config::StoreConfig; + use store::{HotColdDB, MemoryStore}; use tempfile::tempdir; use types::{EthSpec, MinimalEthSpec, Slot}; @@ -644,7 +692,12 @@ mod test { let genesis_time = 13_371_337; let log = get_logger(); - let store = Arc::new(MemoryStore::open()); + let store: HotColdDB< + MinimalEthSpec, + MemoryStore, + MemoryStore, + > = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log.clone()) + .unwrap(); let spec = MinimalEthSpec::default_spec(); let data_dir = tempdir().expect("should create temporary data_dir"); @@ -657,7 +710,7 @@ mod test { let chain = BeaconChainBuilder::new(MinimalEthSpec) .logger(log.clone()) - .store(store) + .store(Arc::new(store)) .store_migrator(NullMigrator) .data_dir(data_dir.path().to_path_buf()) .genesis_state(genesis_state) diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index 213344413..f6573a2db 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -10,8 +10,7 @@ use std::cmp::Ordering; use std::collections::HashMap; use std::iter::DoubleEndedIterator; use std::marker::PhantomData; -use std::sync::Arc; -use store::{DBColumn, Error as StoreError, Store, StoreItem}; +use store::{DBColumn, Error as StoreError, StoreItem}; use types::{ BeaconState, BeaconStateError, ChainSpec, Deposit, Eth1Data, EthSpec, Hash256, Slot, Unsigned, DEPOSIT_TREE_DEPTH, @@ -75,24 +74,22 @@ impl StoreItem for SszEth1 { } /// Holds an `Eth1ChainBackend` and serves requests from the `BeaconChain`. -pub struct Eth1Chain +pub struct Eth1Chain where - T: Eth1ChainBackend, + T: Eth1ChainBackend, E: EthSpec, - S: Store, { backend: T, /// When `true`, the backend will be ignored and dummy data from the 2019 Canada interop method /// will be used instead. pub use_dummy_backend: bool, - _phantom: PhantomData<(E, S)>, + _phantom: PhantomData, } -impl Eth1Chain +impl Eth1Chain where - T: Eth1ChainBackend, + T: Eth1ChainBackend, E: EthSpec, - S: Store, { pub fn new(backend: T) -> Self { Self { @@ -110,7 +107,7 @@ where spec: &ChainSpec, ) -> Result { if self.use_dummy_backend { - let dummy_backend: DummyEth1ChainBackend = DummyEth1ChainBackend::default(); + let dummy_backend: DummyEth1ChainBackend = DummyEth1ChainBackend::default(); dummy_backend.eth1_data(state, spec) } else { self.backend.eth1_data(state, spec) @@ -132,7 +129,7 @@ where spec: &ChainSpec, ) -> Result, Error> { if self.use_dummy_backend { - let dummy_backend: DummyEth1ChainBackend = DummyEth1ChainBackend::default(); + let dummy_backend: DummyEth1ChainBackend = DummyEth1ChainBackend::default(); dummy_backend.queued_deposits(state, eth1_data_vote, spec) } else { self.backend.queued_deposits(state, eth1_data_vote, spec) @@ -145,11 +142,10 @@ where pub fn from_ssz_container( ssz_container: &SszEth1, config: Eth1Config, - store: Arc, log: &Logger, ) -> Result { let backend = - Eth1ChainBackend::from_bytes(&ssz_container.backend_bytes, config, store, log.clone())?; + Eth1ChainBackend::from_bytes(&ssz_container.backend_bytes, config, log.clone())?; Ok(Self { use_dummy_backend: ssz_container.use_dummy_backend, backend, @@ -171,7 +167,7 @@ where } } -pub trait Eth1ChainBackend>: Sized + Send + Sync { +pub trait Eth1ChainBackend: Sized + Send + Sync { /// Returns the `Eth1Data` that should be included in a block being produced for the given /// `state`. fn eth1_data(&self, beacon_state: &BeaconState, spec: &ChainSpec) @@ -195,12 +191,7 @@ pub trait Eth1ChainBackend>: Sized + Send + Sync { fn as_bytes(&self) -> Vec; /// Create a `Eth1ChainBackend` instance given encoded bytes. - fn from_bytes( - bytes: &[u8], - config: Eth1Config, - store: Arc, - log: Logger, - ) -> Result; + fn from_bytes(bytes: &[u8], config: Eth1Config, log: Logger) -> Result; } /// Provides a simple, testing-only backend that generates deterministic, meaningless eth1 data. @@ -208,9 +199,9 @@ pub trait Eth1ChainBackend>: Sized + Send + Sync { /// Never creates deposits, therefore the validator set is static. /// /// This was used in the 2019 Canada interop workshops. -pub struct DummyEth1ChainBackend>(PhantomData<(T, S)>); +pub struct DummyEth1ChainBackend(PhantomData); -impl> Eth1ChainBackend for DummyEth1ChainBackend { +impl Eth1ChainBackend for DummyEth1ChainBackend { /// Produce some deterministic junk based upon the current epoch. fn eth1_data(&self, state: &BeaconState, _spec: &ChainSpec) -> Result { let current_epoch = state.current_epoch(); @@ -243,17 +234,12 @@ impl> Eth1ChainBackend for DummyEth1ChainBackend, - _log: Logger, - ) -> Result { + fn from_bytes(_bytes: &[u8], _config: Eth1Config, _log: Logger) -> Result { Ok(Self(PhantomData)) } } -impl> Default for DummyEth1ChainBackend { +impl Default for DummyEth1ChainBackend { fn default() -> Self { Self(PhantomData) } @@ -265,21 +251,19 @@ impl> Default for DummyEth1ChainBackend { /// The `core` connects to some external eth1 client (e.g., Parity/Geth) and polls it for /// information. #[derive(Clone)] -pub struct CachingEth1Backend { +pub struct CachingEth1Backend { pub core: HttpService, - store: Arc, log: Logger, _phantom: PhantomData, } -impl> CachingEth1Backend { +impl CachingEth1Backend { /// Instantiates `self` with empty caches. /// /// Does not connect to the eth1 node or start any tasks to keep the cache updated. - pub fn new(config: Eth1Config, log: Logger, store: Arc) -> Self { + pub fn new(config: Eth1Config, log: Logger) -> Self { Self { core: HttpService::new(config, log.clone()), - store, log, _phantom: PhantomData, } @@ -291,17 +275,16 @@ impl> CachingEth1Backend { } /// Instantiates `self` from an existing service. - pub fn from_service(service: HttpService, store: Arc) -> Self { + pub fn from_service(service: HttpService) -> Self { Self { log: service.log.clone(), core: service, - store, _phantom: PhantomData, } } } -impl> Eth1ChainBackend for CachingEth1Backend { +impl Eth1ChainBackend for CachingEth1Backend { fn eth1_data(&self, state: &BeaconState, spec: &ChainSpec) -> Result { let period = T::SlotsPerEth1VotingPeriod::to_u64(); let voting_period_start_slot = (state.slot / period) * period; @@ -406,16 +389,10 @@ impl> Eth1ChainBackend for CachingEth1Backend, - log: Logger, - ) -> Result { + fn from_bytes(bytes: &[u8], config: Eth1Config, log: Logger) -> Result { let inner = HttpService::from_bytes(bytes, config, log.clone())?; Ok(Self { core: inner, - store, log, _phantom: PhantomData, }) @@ -572,17 +549,15 @@ mod test { mod eth1_chain_json_backend { use super::*; use eth1::DepositLog; - use store::MemoryStore; use types::test_utils::{generate_deterministic_keypair, TestingDepositBuilder}; - fn get_eth1_chain() -> Eth1Chain>, E, MemoryStore> { + fn get_eth1_chain() -> Eth1Chain, E> { let eth1_config = Eth1Config { ..Eth1Config::default() }; let log = null_logger().unwrap(); - let store = Arc::new(MemoryStore::open()); - Eth1Chain::new(CachingEth1Backend::new(eth1_config, log, store)) + Eth1Chain::new(CachingEth1Backend::new(eth1_config, log)) } fn get_deposit_log(i: u64, spec: &ChainSpec) -> DepositLog { diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index e90ee97a9..7291a3ffc 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -9,14 +9,16 @@ use std::sync::Arc; use std::thread; use store::hot_cold_store::{process_finalization, HotColdDBError}; use store::iter::{ParentRootBlockIterator, RootsIterator}; -use store::{Error, Store, StoreOp}; +use store::{Error, ItemStore, StoreOp}; pub use store::{HotColdDB, MemoryStore}; use types::*; use types::{BeaconState, EthSpec, Hash256, Slot}; /// Trait for migration processes that update the database upon finalization. -pub trait Migrate: Send + Sync + 'static { - fn new(db: Arc>, log: Logger) -> Self; +pub trait Migrate, Cold: ItemStore>: + Send + Sync + 'static +{ + fn new(db: Arc>, log: Logger) -> Self; fn process_finalization( &self, @@ -30,18 +32,23 @@ pub trait Migrate: Send + Sync + 'static { } /// Traverses live heads and prunes blocks and states of chains that we know can't be built - /// upon because finalization would prohibit it. This is a optimisation intended to save disk + /// upon because finalization would prohibit it. This is an optimisation intended to save disk /// space. /// /// Assumptions: /// * It is called after every finalization. fn prune_abandoned_forks( - store: Arc>, + store: Arc>, head_tracker: Arc, old_finalized_block_hash: SignedBeaconBlockHash, new_finalized_block_hash: SignedBeaconBlockHash, new_finalized_slot: Slot, ) -> Result<(), BeaconChainError> { + // There will never be any blocks to prune if there is only a single head in the chain. + if head_tracker.heads().len() == 1 { + return Ok(()); + } + let old_finalized_slot = store .get_block(&old_finalized_block_hash.into())? .ok_or_else(|| BeaconChainError::MissingBeaconBlock(old_finalized_block_hash.into()))? @@ -84,9 +91,10 @@ pub trait Migrate: Send + Sync + 'static { .ok_or_else(|| BeaconStateError::MissingBeaconBlock(head_hash.into()))? .state_root(); - let iterator = std::iter::once((head_hash, head_state_hash, head_slot)) + let iter = std::iter::once(Ok((head_hash, head_state_hash, head_slot))) .chain(RootsIterator::from_block(Arc::clone(&store), head_hash)?); - for (block_hash, state_hash, slot) in iterator { + for maybe_tuple in iter { + let (block_hash, state_hash, slot) = maybe_tuple?; if slot < old_finalized_slot { // We must assume here any candidate chains include old_finalized_block_hash, // i.e. there aren't any forks starting at a block that is a strict ancestor of @@ -165,8 +173,8 @@ pub trait Migrate: Send + Sync + 'static { /// Migrator that does nothing, for stores that don't need migration. pub struct NullMigrator; -impl Migrate for NullMigrator { - fn new(_: Arc>, _: Logger) -> Self { +impl, Cold: ItemStore> Migrate for NullMigrator { + fn new(_: Arc>, _: Logger) -> Self { NullMigrator } } @@ -174,12 +182,14 @@ impl Migrate for NullMigrator { /// Migrator that immediately calls the store's migration function, blocking the current execution. /// /// Mostly useful for tests. -pub struct BlockingMigrator { - db: Arc>, +pub struct BlockingMigrator, Cold: ItemStore> { + db: Arc>, } -impl Migrate for BlockingMigrator { - fn new(db: Arc>, _: Logger) -> Self { +impl, Cold: ItemStore> Migrate + for BlockingMigrator +{ + fn new(db: Arc>, _: Logger) -> Self { BlockingMigrator { db } } @@ -219,14 +229,16 @@ type MpscSender = mpsc::Sender<( )>; /// Migrator that runs a background thread to migrate state from the hot to the cold database. -pub struct BackgroundMigrator { - db: Arc>, +pub struct BackgroundMigrator, Cold: ItemStore> { + db: Arc>, tx_thread: Mutex<(MpscSender, thread::JoinHandle<()>)>, log: Logger, } -impl Migrate for BackgroundMigrator { - fn new(db: Arc>, log: Logger) -> Self { +impl, Cold: ItemStore> Migrate + for BackgroundMigrator +{ + fn new(db: Arc>, log: Logger) -> Self { let tx_thread = Mutex::new(Self::spawn_thread(db.clone(), log.clone())); Self { db, tx_thread, log } } @@ -277,7 +289,7 @@ impl Migrate for BackgroundMigrator { } } -impl BackgroundMigrator { +impl, Cold: ItemStore> BackgroundMigrator { /// Return true if a migration needs to be performed, given a new `finalized_slot`. fn needs_migration(&self, finalized_slot: Slot, max_finality_distance: u64) -> bool { let finality_distance = finalized_slot - self.db.get_split_slot(); @@ -288,7 +300,7 @@ impl BackgroundMigrator { /// /// Return a channel handle for sending new finalized states to the thread. fn spawn_thread( - db: Arc>, + db: Arc>, log: Logger, ) -> ( mpsc::Sender<( diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 4dda8e750..660eaefd9 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -18,7 +18,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use store::{HotColdDB, MemoryStore, Store}; +use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore}; use tempfile::{tempdir, TempDir}; use tree_hash::TreeHash; use types::{ @@ -34,17 +34,19 @@ pub const HARNESS_GENESIS_TIME: u64 = 1_567_552_690; // This parameter is required by a builder but not used because we use the `TestingSlotClock`. pub const HARNESS_SLOT_TIME: Duration = Duration::from_secs(1); -pub type BaseHarnessType = Witness< - TStore, +pub type BaseHarnessType = Witness< TStoreMigrator, TestingSlotClock, - CachingEth1Backend, + CachingEth1Backend, TEthSpec, NullEventHandler, + THotStore, + TColdStore, >; -pub type HarnessType = BaseHarnessType, NullMigrator, E>; -pub type DiskHarnessType = BaseHarnessType, BlockingMigrator, E>; +pub type HarnessType = BaseHarnessType, MemoryStore>; +pub type DiskHarnessType = + BaseHarnessType, LevelDB>, E, LevelDB, LevelDB>; /// Indicates how the `BeaconChainHarness` should produce blocks. #[derive(Clone, Copy, Debug)] @@ -84,12 +86,12 @@ pub struct BeaconChainHarness { impl BeaconChainHarness> { /// Instantiate a new harness with `validator_count` initial validators. - pub fn new(eth_spec_instance: E, keypairs: Vec) -> Self { + pub fn new(eth_spec_instance: E, keypairs: Vec, config: StoreConfig) -> Self { // Setting the target aggregators to really high means that _all_ validators in the // committee are required to produce an aggregate. This is overkill, however with small // validator counts it's the only way to be certain there is _at least one_ aggregator per // committee. - Self::new_with_target_aggregators(eth_spec_instance, keypairs, 1 << 32) + Self::new_with_target_aggregators(eth_spec_instance, keypairs, 1 << 32, config) } /// Instantiate a new harness with `validator_count` initial validators and a custom @@ -98,6 +100,7 @@ impl BeaconChainHarness> { eth_spec_instance: E, keypairs: Vec, target_aggregators_per_committee: u64, + config: StoreConfig, ) -> Self { let data_dir = tempdir().expect("should create temporary data_dir"); let mut spec = E::default_spec(); @@ -105,11 +108,11 @@ impl BeaconChainHarness> { spec.target_aggregators_per_committee = target_aggregators_per_committee; let log = NullLoggerBuilder.build().expect("logger should build"); - + let store = HotColdDB::open_ephemeral(config, spec.clone(), log.clone()).unwrap(); let chain = BeaconChainBuilder::new(eth_spec_instance) - .logger(log.clone()) + .logger(log) .custom_spec(spec.clone()) - .store(Arc::new(MemoryStore::open())) + .store(Arc::new(store)) .store_migrator(NullMigrator) .data_dir(data_dir.path().to_path_buf()) .genesis_state( @@ -138,7 +141,7 @@ impl BeaconChainHarness> { /// Instantiate a new harness with `validator_count` initial validators. pub fn new_with_disk_store( eth_spec_instance: E, - store: Arc>, + store: Arc, LevelDB>>, keypairs: Vec, ) -> Self { let data_dir = tempdir().expect("should create temporary data_dir"); @@ -176,7 +179,7 @@ impl BeaconChainHarness> { /// Instantiate a new harness with `validator_count` initial validators. pub fn resume_from_disk_store( eth_spec_instance: E, - store: Arc>, + store: Arc, LevelDB>>, keypairs: Vec, data_dir: TempDir, ) -> Self { @@ -188,7 +191,10 @@ impl BeaconChainHarness> { .logger(log.clone()) .custom_spec(spec) .store(store.clone()) - .store_migrator( as Migrate>::new(store, log.clone())) + .store_migrator( as Migrate>::new( + store, + log.clone(), + )) .data_dir(data_dir.path().to_path_buf()) .resume_from_db() .expect("should resume beacon chain from db") @@ -209,11 +215,12 @@ impl BeaconChainHarness> { } } -impl BeaconChainHarness> +impl BeaconChainHarness> where - S: Store, - M: Migrate, + M: Migrate, E: EthSpec, + Hot: ItemStore, + Cold: ItemStore, { /// Advance the slot of the `BeaconChain`. /// diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 1258c1ca6..af5a0ce47 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -7,6 +7,7 @@ use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}, StateSkipConfig, }; +use store::config::StoreConfig; use tree_hash::TreeHash; use types::{AggregateSignature, EthSpec, Keypair, MainnetEthSpec, RelativeEpoch, Slot}; @@ -25,7 +26,11 @@ lazy_static! { fn produces_attestations() { let num_blocks_produced = MainnetEthSpec::slots_per_epoch() * 4; - let harness = BeaconChainHarness::new(MainnetEthSpec, KEYPAIRS[..].to_vec()); + let harness = BeaconChainHarness::new( + MainnetEthSpec, + KEYPAIRS[..].to_vec(), + StoreConfig::default(), + ); // Skip past the genesis slot. harness.advance_slot(); diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index d51ae2693..532736670 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -9,7 +9,7 @@ use beacon_chain::{ BeaconChain, BeaconChainTypes, }; use state_processing::per_slot_processing; -use store::Store; +use store::config::StoreConfig; use tree_hash::TreeHash; use types::{ test_utils::generate_deterministic_keypair, AggregateSignature, Attestation, EthSpec, Hash256, @@ -36,6 +36,7 @@ fn get_harness(validator_count: usize) -> BeaconChainHarness> { // A kind-of arbitrary number that ensures that _some_ validators are aggregators, but // not all. 4, + StoreConfig::default(), ); harness.advance_slot(); diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 9dae527c8..16546404b 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -7,6 +7,7 @@ use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, HarnessType}, BeaconSnapshot, BlockError, }; +use store::config::StoreConfig; use types::{ test_utils::generate_deterministic_keypair, AggregateSignature, AttestationData, AttesterSlashing, Checkpoint, Deposit, DepositData, Epoch, EthSpec, Hash256, @@ -47,7 +48,11 @@ fn get_chain_segment() -> Vec> { } fn get_harness(validator_count: usize) -> BeaconChainHarness> { - let harness = BeaconChainHarness::new(MainnetEthSpec, KEYPAIRS[0..validator_count].to_vec()); + let harness = BeaconChainHarness::new( + MainnetEthSpec, + KEYPAIRS[0..validator_count].to_vec(), + StoreConfig::default(), + ); harness.advance_slot(); diff --git a/beacon_node/beacon_chain/tests/persistence_tests.rs b/beacon_node/beacon_chain/tests/persistence_tests.rs index c21337426..23f1a94f4 100644 --- a/beacon_node/beacon_chain/tests/persistence_tests.rs +++ b/beacon_node/beacon_chain/tests/persistence_tests.rs @@ -9,7 +9,7 @@ use beacon_chain::{ }; use sloggers::{null::NullLoggerBuilder, Build}; use std::sync::Arc; -use store::{HotColdDB, StoreConfig}; +use store::{HotColdDB, LevelDB, StoreConfig}; use tempfile::{tempdir, TempDir}; use types::{EthSpec, Keypair, MinimalEthSpec}; @@ -23,7 +23,7 @@ lazy_static! { static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); } -fn get_store(db_path: &TempDir) -> Arc> { +fn get_store(db_path: &TempDir) -> Arc, LevelDB>> { let spec = E::default_spec(); let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 39b8556a7..89a183792 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use store::{ iter::{BlockRootsIterator, StateRootsIterator}, - HotColdDB, Store, StoreConfig, + HotColdDB, LevelDB, StoreConfig, }; use tempfile::{tempdir, TempDir}; use tree_hash::TreeHash; @@ -35,7 +35,7 @@ lazy_static! { type E = MinimalEthSpec; type TestHarness = BeaconChainHarness>; -fn get_store(db_path: &TempDir) -> Arc> { +fn get_store(db_path: &TempDir) -> Arc, LevelDB>> { let spec = MinimalEthSpec::default_spec(); let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); @@ -47,7 +47,10 @@ fn get_store(db_path: &TempDir) -> Arc> { ) } -fn get_harness(store: Arc>, validator_count: usize) -> TestHarness { +fn get_harness( + store: Arc, LevelDB>>, + validator_count: usize, +) -> TestHarness { let harness = BeaconChainHarness::new_with_disk_store( MinimalEthSpec, store, @@ -392,6 +395,7 @@ fn delete_blocks_and_states() { .expect("faulty head state exists"); let states_to_delete = StateRootsIterator::new(store.clone(), &faulty_head_state) + .map(Result::unwrap) .take_while(|(_, slot)| *slot > unforked_blocks) .collect::>(); @@ -409,6 +413,7 @@ fn delete_blocks_and_states() { // Deleting the blocks from the fork should remove them completely let blocks_to_delete = BlockRootsIterator::new(store.clone(), &faulty_head_state) + .map(Result::unwrap) // Extra +1 here accounts for the skipped slot that started this fork .take_while(|(_, slot)| *slot > unforked_blocks + 1) .collect::>(); @@ -424,6 +429,7 @@ fn delete_blocks_and_states() { .chain .rev_iter_state_roots() .expect("rev iter ok") + .map(Result::unwrap) .filter(|(_, slot)| *slot < split_slot); for (state_root, slot) in finalized_states { @@ -659,11 +665,12 @@ fn check_shuffling_compatible( let previous_pivot_slot = (head_state.previous_epoch() - shuffling_lookahead).end_slot(E::slots_per_epoch()); - for (block_root, slot) in harness + for maybe_tuple in harness .chain .rev_iter_block_roots_from(head_block_root) .unwrap() { + let (block_root, slot) = maybe_tuple.unwrap(); // Shuffling is compatible targeting the current epoch, // iff slot is greater than or equal to the current epoch pivot block assert_eq!( @@ -1306,7 +1313,7 @@ fn check_finalization(harness: &TestHarness, expected_slot: u64) { } /// Check that the HotColdDB's split_slot is equal to the start slot of the last finalized epoch. -fn check_split_slot(harness: &TestHarness, store: Arc>) { +fn check_split_slot(harness: &TestHarness, store: Arc, LevelDB>>) { let split_slot = store.get_split_slot(); assert_eq!( harness @@ -1357,13 +1364,15 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { .collect::>(); let head = harness.chain.head().expect("should get head"); - let mut forward_block_roots = Store::forwards_block_roots_iterator( + let mut forward_block_roots = HotColdDB::forwards_block_roots_iterator( harness.chain.store.clone(), Slot::new(0), head.beacon_state, head.beacon_block_root, &harness.spec, ) + .unwrap() + .map(Result::unwrap) .collect::>(); // Drop the block roots for skipped slots. @@ -1387,6 +1396,7 @@ fn check_iterators(harness: &TestHarness) { .rev_iter_state_roots() .expect("should get iter") .last() + .map(Result::unwrap) .map(|(_, slot)| slot), Some(Slot::new(0)) ); @@ -1396,6 +1406,7 @@ fn check_iterators(harness: &TestHarness) { .rev_iter_block_roots() .expect("should get iter") .last() + .map(Result::unwrap) .map(|(_, slot)| slot), Some(Slot::new(0)) ); diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 61e340b02..8015871c7 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -13,7 +13,7 @@ use operation_pool::PersistedOperationPool; use state_processing::{ per_slot_processing, per_slot_processing::Error as SlotProcessingError, EpochProcessingError, }; -use store::Store; +use store::config::StoreConfig; use types::{BeaconStateError, EthSpec, Hash256, Keypair, MinimalEthSpec, RelativeEpoch, Slot}; // Should ideally be divisible by 3. @@ -25,7 +25,11 @@ lazy_static! { } fn get_harness(validator_count: usize) -> BeaconChainHarness> { - let harness = BeaconChainHarness::new(MinimalEthSpec, KEYPAIRS[0..validator_count].to_vec()); + let harness = BeaconChainHarness::new( + MinimalEthSpec, + KEYPAIRS[0..validator_count].to_vec(), + StoreConfig::default(), + ); harness.advance_slot(); @@ -73,11 +77,13 @@ fn iterators() { .chain .rev_iter_block_roots() .expect("should get iter") + .map(Result::unwrap) .collect(); let state_roots: Vec<(Hash256, Slot)> = harness .chain .rev_iter_state_roots() .expect("should get iter") + .map(Result::unwrap) .collect(); assert_eq!( diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 66f3ad6fd..66942dc5d 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -5,9 +5,9 @@ use beacon_chain::events::TeeEventHandler; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::{CachingEth1Backend, Eth1Chain}, - migrate::{BackgroundMigrator, Migrate, NullMigrator}, + migrate::{BackgroundMigrator, Migrate}, slot_clock::{SlotClock, SystemTimeSlotClock}, - store::{HotColdDB, MemoryStore, Store, StoreConfig}, + store::{HotColdDB, ItemStore, LevelDB, StoreConfig}, BeaconChain, BeaconChainTypes, Eth1ChainBackend, EventHandler, }; use bus::Bus; @@ -50,7 +50,7 @@ pub const ETH1_GENESIS_UPDATE_INTERVAL_MILLIS: u64 = 7_000; /// `self.memory_store(..)` has been called. pub struct ClientBuilder { slot_clock: Option, - store: Option>, + store: Option>>, store_migrator: Option, runtime_context: Option>, chain_spec: Option, @@ -65,17 +65,26 @@ pub struct ClientBuilder { eth_spec_instance: T::EthSpec, } -impl +impl ClientBuilder< - Witness, + Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate, + TStoreMigrator: Migrate, TSlotClock: SlotClock + Clone + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Instantiates a new, empty builder. /// @@ -350,8 +359,17 @@ where /// If type inference errors are being raised, see the comment on the definition of `Self`. pub fn build( self, - ) -> Client> - { + ) -> Client< + Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, + > { Client { beacon_chain: self.beacon_chain, network_globals: self.network_globals, @@ -361,17 +379,26 @@ where } } -impl +impl ClientBuilder< - Witness, + Witness< + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate, + TStoreMigrator: Migrate, TSlotClock: SlotClock + Clone + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Consumes the internal `BeaconChainBuilder`, attaching the resulting `BeaconChain` to self. pub fn build_beacon_chain(mut self) -> Result { @@ -399,23 +426,25 @@ where } } -impl +impl ClientBuilder< Witness< - TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, WebSocketSender, + THotStore, + TColdStore, >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate, + TStoreMigrator: Migrate, TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Specifies that the `BeaconChain` should publish events using the WebSocket server. pub fn websocket_event_handler(mut self, config: WebSocketConfig) -> Result { @@ -440,23 +469,25 @@ where } } -impl +impl ClientBuilder< Witness< - TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, TeeEventHandler, + THotStore, + TColdStore, >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate, + TStoreMigrator: Migrate, TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Specifies that the `BeaconChain` should publish events using the WebSocket server. pub fn tee_event_handler( @@ -488,18 +519,19 @@ where impl ClientBuilder< Witness< - HotColdDB, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, TEventHandler, + LevelDB, + LevelDB, >, > where TSlotClock: SlotClock + 'static, - TStoreMigrator: Migrate + 'static, - TEth1Backend: Eth1ChainBackend> + 'static, + TStoreMigrator: Migrate, LevelDB> + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, { @@ -527,50 +559,25 @@ where } } -impl +impl ClientBuilder< Witness< - MemoryStore, - NullMigrator, + BackgroundMigrator, TSlotClock, TEth1Backend, TEthSpec, TEventHandler, + THotStore, + TColdStore, >, > where TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend> + 'static, - TEthSpec: EthSpec + 'static, - TEventHandler: EventHandler + 'static, -{ - /// Specifies that the `Client` should use a `MemoryStore` database. - /// - /// Also sets the `store_migrator` to the `NullMigrator`, as that's the only viable choice. - pub fn memory_store(mut self) -> Self { - let store = MemoryStore::open(); - self.store = Some(Arc::new(store)); - self.store_migrator = Some(NullMigrator); - self - } -} - -impl - ClientBuilder< - Witness< - HotColdDB, - BackgroundMigrator, - TSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - >, - > -where - TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend> + 'static, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { pub fn background_migrator(mut self) -> Result { let context = self @@ -586,23 +593,25 @@ where } } -impl +impl ClientBuilder< Witness< - TStore, TStoreMigrator, TSlotClock, - CachingEth1Backend, + CachingEth1Backend, TEthSpec, TEventHandler, + THotStore, + TColdStore, >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate, + TStoreMigrator: Migrate, TSlotClock: SlotClock + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Specifies that the `BeaconChain` should cache eth1 blocks/logs from a remote eth1 node /// (e.g., Parity/Geth) and refer to that cache when collecting deposits or eth1 votes during @@ -616,10 +625,6 @@ where let beacon_chain_builder = self .beacon_chain_builder .ok_or_else(|| "caching_eth1_backend requires a beacon_chain_builder")?; - let store = self - .store - .clone() - .ok_or_else(|| "caching_eth1_backend requires a store".to_string())?; let backend = if let Some(eth1_service_from_genesis) = self.eth1_service { eth1_service_from_genesis.update_config(config)?; @@ -634,7 +639,7 @@ where // adding earlier blocks too. eth1_service_from_genesis.drop_block_cache(); - CachingEth1Backend::from_service(eth1_service_from_genesis, store) + CachingEth1Backend::from_service(eth1_service_from_genesis) } else { beacon_chain_builder .get_persisted_eth1_backend()? @@ -642,18 +647,11 @@ where Eth1Chain::from_ssz_container( &persisted, config.clone(), - store.clone(), &context.log().clone(), ) .map(|chain| chain.into_backend()) }) - .unwrap_or_else(|| { - Ok(CachingEth1Backend::new( - config, - context.log().clone(), - store, - )) - })? + .unwrap_or_else(|| Ok(CachingEth1Backend::new(config, context.log().clone())))? }; self.eth1_service = None; @@ -697,16 +695,25 @@ where } } -impl +impl ClientBuilder< - Witness, + Witness< + TStoreMigrator, + SystemTimeSlotClock, + TEth1Backend, + TEthSpec, + TEventHandler, + THotStore, + TColdStore, + >, > where - TStore: Store + 'static, - TStoreMigrator: Migrate, - TEth1Backend: Eth1ChainBackend + 'static, + TStoreMigrator: Migrate, + TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Specifies that the slot clock should read the time from the computers system clock. pub fn system_time_slot_clock(mut self) -> Result { diff --git a/beacon_node/eth2-libp2p/tests/rpc_tests.rs b/beacon_node/eth2-libp2p/tests/rpc_tests.rs index 2c96073c5..2a0bac735 100644 --- a/beacon_node/eth2-libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2-libp2p/tests/rpc_tests.rs @@ -95,7 +95,7 @@ async fn test_status_rpc() { tokio::select! { _ = sender_future => {} _ = receiver_future => {} - _ = delay_for(Duration::from_millis(2000)) => { + _ = delay_for(Duration::from_secs(30)) => { panic!("Future timed out"); } } @@ -205,7 +205,7 @@ async fn test_blocks_by_range_chunked_rpc() { tokio::select! { _ = sender_future => {} _ = receiver_future => {} - _ = delay_for(Duration::from_millis(2000)) => { + _ = delay_for(Duration::from_secs(30)) => { panic!("Future timed out"); } } @@ -292,7 +292,7 @@ async fn test_blocks_by_range_chunked_rpc_terminates_correctly() { // sent in the timeout match futures::future::select( Box::pin(receiver.next_event()), - tokio::time::delay_for(Duration::from_millis(50)), + tokio::time::delay_for(Duration::from_secs(1)), ) .await { @@ -335,7 +335,7 @@ async fn test_blocks_by_range_chunked_rpc_terminates_correctly() { tokio::select! { _ = sender_future => {} _ = receiver_future => {} - _ = delay_for(Duration::from_millis(2000)) => { + _ = delay_for(Duration::from_secs(30)) => { panic!("Future timed out"); } } @@ -442,7 +442,7 @@ async fn test_blocks_by_range_single_empty_rpc() { tokio::select! { _ = sender_future => {} _ = receiver_future => {} - _ = delay_for(Duration::from_millis(800)) => { + _ = delay_for(Duration::from_secs(20)) => { panic!("Future timed out"); } } @@ -556,7 +556,7 @@ async fn test_blocks_by_root_chunked_rpc() { tokio::select! { _ = sender_future => {} _ = receiver_future => {} - _ = delay_for(Duration::from_millis(2000)) => { + _ = delay_for(Duration::from_secs(30)) => { panic!("Future timed out"); } } @@ -652,7 +652,7 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() { // sent in the timeout match futures::future::select( Box::pin(receiver.next_event()), - tokio::time::delay_for(Duration::from_millis(50)), + tokio::time::delay_for(Duration::from_millis(1000)), ) .await { @@ -695,7 +695,7 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() { tokio::select! { _ = sender_future => {} _ = receiver_future => {} - _ = delay_for(Duration::from_millis(2000)) => { + _ = delay_for(Duration::from_secs(30)) => { panic!("Future timed out"); } } @@ -753,7 +753,7 @@ async fn test_goodbye_rpc() { tokio::select! { _ = sender_future => {} _ = receiver_future => {} - _ = delay_for(Duration::from_millis(2000)) => { + _ = delay_for(Duration::from_secs(30)) => { panic!("Future timed out"); } } diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index bf6358ebc..e9b01d194 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -11,7 +11,6 @@ lazy_static = "1.4.0" matches = "0.1.8" tempfile = "3.1.0" exit-future = "0.2.0" -assert_approx_eq = "1.1.0" [dependencies] beacon_chain = { path = "../beacon_chain" } @@ -38,3 +37,4 @@ rlp = "0.4.5" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } environment = { path = "../../lighthouse/environment" } +itertools = "0.9.0" diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index 4f30a3443..8447808a1 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -17,19 +17,21 @@ mod tests { use sloggers::{null::NullLoggerBuilder, Build}; use slot_clock::{SlotClock, SystemTimeSlotClock}; use std::time::{Duration, SystemTime}; - use store::MemoryStore; + use store::config::StoreConfig; + use store::{HotColdDB, MemoryStore}; use tempfile::tempdir; use types::{CommitteeIndex, EnrForkId, EthSpec, MinimalEthSpec}; const SLOT_DURATION_MILLIS: u64 = 200; type TestBeaconChainType = Witness< - MemoryStore, NullMigrator, SystemTimeSlotClock, - CachingEth1Backend>, + CachingEth1Backend, MinimalEthSpec, NullEventHandler, + MemoryStore, + MemoryStore, >; pub struct TestBeaconChain { @@ -44,11 +46,14 @@ mod tests { let keypairs = generate_deterministic_keypairs(1); let log = get_logger(); + let store = + HotColdDB::open_ephemeral(StoreConfig::default(), spec.clone(), log.clone()) + .unwrap(); let chain = Arc::new( BeaconChainBuilder::new(MinimalEthSpec) .logger(log.clone()) .custom_spec(spec.clone()) - .store(Arc::new(MemoryStore::open())) + .store(Arc::new(store)) .store_migrator(NullMigrator) .data_dir(data_dir.path().to_path_buf()) .genesis_state( @@ -83,7 +88,7 @@ mod tests { } lazy_static! { - static ref CHAIN: TestBeaconChain = { TestBeaconChain::new_with_system_clock() }; + static ref CHAIN: TestBeaconChain = TestBeaconChain::new_with_system_clock(); } fn get_attestation_service() -> AttestationService { diff --git a/beacon_node/network/src/persisted_dht.rs b/beacon_node/network/src/persisted_dht.rs index c0c811259..1f68f7ac1 100644 --- a/beacon_node/network/src/persisted_dht.rs +++ b/beacon_node/network/src/persisted_dht.rs @@ -1,13 +1,15 @@ use eth2_libp2p::Enr; use rlp; use std::sync::Arc; -use store::{DBColumn, Error as StoreError, Store, StoreItem}; +use store::{DBColumn, Error as StoreError, HotColdDB, ItemStore, StoreItem}; use types::{EthSpec, Hash256}; /// 32-byte key for accessing the `DhtEnrs`. pub const DHT_DB_KEY: &str = "PERSISTEDDHTPERSISTEDDHTPERSISTE"; -pub fn load_dht, E: EthSpec>(store: Arc) -> Vec { +pub fn load_dht, Cold: ItemStore>( + store: Arc>, +) -> Vec { // Load DHT from store let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); match store.get_item(&key) { @@ -20,8 +22,8 @@ pub fn load_dht, E: EthSpec>(store: Arc) -> Vec { } /// Attempt to persist the ENR's in the DHT to `self.store`. -pub fn persist_dht, E: EthSpec>( - store: Arc, +pub fn persist_dht, Cold: ItemStore>( + store: Arc>, enrs: Vec, ) -> Result<(), store::Error> { let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); @@ -56,14 +58,19 @@ impl StoreItem for PersistedDht { mod tests { use super::*; use eth2_libp2p::Enr; + use sloggers::{null::NullLoggerBuilder, Build}; use std::str::FromStr; - use std::sync::Arc; - use store::{MemoryStore, Store}; - use types::Hash256; - use types::MinimalEthSpec; + use store::config::StoreConfig; + use store::{HotColdDB, MemoryStore}; + use types::{ChainSpec, Hash256, MinimalEthSpec}; #[test] fn test_persisted_dht() { - let store = Arc::new(MemoryStore::::open()); + let log = NullLoggerBuilder.build().unwrap(); + let store: HotColdDB< + MinimalEthSpec, + MemoryStore, + MemoryStore, + > = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log).unwrap(); let enrs = vec![Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap()]; let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); store diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index ccd248043..aa110d427 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -10,10 +10,10 @@ use beacon_chain::{ }; use eth2_libp2p::rpc::*; use eth2_libp2p::{NetworkGlobals, PeerId, Request, Response}; +use itertools::process_results; use slog::{debug, error, o, trace, warn}; use ssz::Encode; use std::sync::Arc; -use store::Store; use tokio::sync::mpsc; use types::{ Attestation, ChainSpec, Epoch, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, @@ -362,20 +362,29 @@ impl Processor { // pick out the required blocks, ignoring skip-slots and stepping by the step parameter; let mut last_block_root = None; - let block_roots = forwards_block_root_iter - .take_while(|(_root, slot)| slot.as_u64() < req.start_slot + req.count * req.step) - // map skip slots to None - .map(|(root, _slot)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .step_by(req.step as usize) - .collect::>(); + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot + req.count * req.step) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .step_by(req.step as usize) + .collect::>>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => { + error!(self.log, "Error during iteration over blocks"; "error" => format!("{:?}", e)); + return; + } + }; // remove all skip slots let block_roots = block_roots diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index efabaa23b..edd480f79 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -17,6 +17,7 @@ use rest_types::ValidatorSubscription; use slog::{debug, error, info, o, trace}; use std::sync::Arc; use std::time::Duration; +use store::HotColdDB; use tokio::sync::mpsc; use tokio::time::Delay; use types::EthSpec; @@ -40,7 +41,7 @@ pub struct NetworkService { /// lighthouse. router_send: mpsc::UnboundedSender>, /// A reference to lighthouse's database to persist the DHT. - store: Arc, + store: Arc>, /// A collection of global variables, accessible outside of the network service. network_globals: Arc>, /// A delay that expires when a new fork takes place. @@ -78,7 +79,7 @@ impl NetworkService { let (network_globals, mut libp2p) = LibP2PService::new(executor.clone(), config, enr_fork_id, &network_log)?; - for enr in load_dht::(store.clone()) { + for enr in load_dht::(store.clone()) { libp2p.swarm.add_enr(enr); } @@ -142,7 +143,7 @@ fn spawn_service( "Number of peers" => format!("{}", enrs.len()), ); - match persist_dht::(service.store.clone(), enrs) { + match persist_dht::(service.store.clone(), enrs) { Err(e) => error!( service.log, "Failed to persist DHT on drop"; diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 48aa038c8..b27933924 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -9,6 +9,7 @@ mod tests { use sloggers::{null::NullLoggerBuilder, Build}; use std::str::FromStr; use std::sync::Arc; + use store::config::StoreConfig; use tokio::runtime::Runtime; use types::{test_utils::generate_deterministic_keypairs, MinimalEthSpec}; @@ -22,7 +23,12 @@ mod tests { let log = get_logger(); let beacon_chain = Arc::new( - BeaconChainHarness::new(MinimalEthSpec, generate_deterministic_keypairs(8)).chain, + BeaconChainHarness::new( + MinimalEthSpec, + generate_deterministic_keypairs(8), + StoreConfig::default(), + ) + .chain, ); let store = beacon_chain.store.clone(); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index b608d410b..eea70305e 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -265,15 +265,18 @@ impl SyncManager { "local_finalized_epoch" => local_peer_info.finalized_epoch, ); - // if we don't know about the peer's chain add it to the range sync, otherwise - // consider it synced (it can be the case that the peer seems ahead of us, but we - // reject its chain). + // There are few cases to handle here: + // + // - A peer could appear advanced if our fork choice has rejected their version of + // the chain. If we know of their head slot, we consider this peer fully synced. + // - A peer could have just advanced to the next epoch and have a new finalized + // epoch that is currently ahead of ours. If their finalized epoch is ahead of ours + // by one and their head_slot is within the slot tolerance, consider this peer + // fully synced. - if self - .chain - .fork_choice - .read() - .contains_block(&remote.head_root) + if (self.chain.fork_choice.read().contains_block(&remote.head_root)) || // the first case + (remote.finalized_epoch.sub(local_peer_info.finalized_epoch) == 1 && remote.head_slot.sub(local_peer_info.head_slot) < SLOT_IMPORT_TOLERANCE as u64) + // the second case { self.synced_peer(&peer_id, remote); // notify the range sync that a peer has been added @@ -554,7 +557,7 @@ impl SyncManager { self.update_sync_state(); } - /// Updates the syncing state of a peer to be behind. + /// Updates the syncing state of a peer to be advanced. fn advanced_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { let head_slot = sync_info.head_slot; diff --git a/beacon_node/network/src/sync/peer_sync_info.rs b/beacon_node/network/src/sync/peer_sync_info.rs index 78d91282a..2897b0447 100644 --- a/beacon_node/network/src/sync/peer_sync_info.rs +++ b/beacon_node/network/src/sync/peer_sync_info.rs @@ -21,7 +21,7 @@ pub struct PeerSyncInfo { pub enum PeerSyncType { /// The peer is on our chain and is fully synced with respect to our chain. FullySynced, - /// The peer has a greater knowledge of the chain that us that warrants a full sync. + /// The peer has a greater knowledge of the chain than us that warrants a full sync. Advanced, /// A peer is behind in the sync and not useful to us for downloading blocks. Behind, @@ -56,8 +56,8 @@ impl PeerSyncInfo { Some(Self::from(status_message(chain)?)) } - /// Given another peer's `PeerSyncInfo` this will determine how useful that peer is for us in - /// regards to syncing. This returns the peer sync type that can then be handled by the + /// Given another peer's `PeerSyncInfo` this will determine how useful that peer is to us in + /// regards to syncing. This returns the peer sync type that can then be handled by the /// `SyncManager`. pub fn peer_sync_type(&self, remote_peer_sync_info: &PeerSyncInfo) -> PeerSyncType { // check if the peer is fully synced with our current chain diff --git a/beacon_node/rest_api/Cargo.toml b/beacon_node/rest_api/Cargo.toml index f8cdbfe4f..122887d71 100644 --- a/beacon_node/rest_api/Cargo.toml +++ b/beacon_node/rest_api/Cargo.toml @@ -39,6 +39,7 @@ rayon = "1.3.0" environment = { path = "../../lighthouse/environment" } uhttp_sse = "0.5.1" bus = "2.2.3" +itertools = "0.9.0" [dev-dependencies] assert_matches = "1.3.0" diff --git a/beacon_node/rest_api/src/beacon.rs b/beacon_node/rest_api/src/beacon.rs index f0040cfef..fc0ed8a5a 100644 --- a/beacon_node/rest_api/src/beacon.rs +++ b/beacon_node/rest_api/src/beacon.rs @@ -13,7 +13,6 @@ use rest_types::{ }; use std::io::Write; use std::sync::Arc; -use store::Store; use slog::{error, Logger}; use types::{ diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index b07bb97d5..872e7c834 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -5,9 +5,10 @@ use eth2_libp2p::PubsubMessage; use hex; use http::header; use hyper::{Body, Request}; +use itertools::process_results; use network::NetworkMessage; use ssz::Decode; -use store::{iter::AncestorIter, Store}; +use store::iter::AncestorIter; use types::{ BeaconState, CommitteeIndex, Epoch, EthSpec, Hash256, RelativeEpoch, SignedBeaconBlock, Slot, }; @@ -118,11 +119,14 @@ pub fn block_root_at_slot( beacon_chain: &BeaconChain, target: Slot, ) -> Result, ApiError> { - Ok(beacon_chain - .rev_iter_block_roots()? - .take_while(|(_root, slot)| *slot >= target) - .find(|(_root, slot)| *slot == target) - .map(|(root, _slot)| root)) + Ok(process_results( + beacon_chain.rev_iter_block_roots()?, + |iter| { + iter.take_while(|(_, slot)| *slot >= target) + .find(|(_, slot)| *slot == target) + .map(|(root, _)| root) + }, + )?) } /// Returns a `BeaconState` and it's root in the canonical chain of `beacon_chain` at the given @@ -190,12 +194,15 @@ pub fn state_root_at_slot( // // Iterate through the state roots on the head state to find the root for that // slot. Once the root is found, load it from the database. - Ok(head_state - .try_iter_ancestor_roots(beacon_chain.store.clone()) - .ok_or_else(|| ApiError::ServerError("Failed to create roots iterator".to_string()))? - .find(|(_root, s)| *s == slot) - .map(|(root, _slot)| root) - .ok_or_else(|| ApiError::NotFound(format!("Unable to find state at slot {}", slot)))?) + process_results( + head_state + .try_iter_ancestor_roots(beacon_chain.store.clone()) + .ok_or_else(|| { + ApiError::ServerError("Failed to create roots iterator".to_string()) + })?, + |mut iter| iter.find(|(_, s)| *s == slot).map(|(root, _)| root), + )? + .ok_or_else(|| ApiError::NotFound(format!("Unable to find state at slot {}", slot))) } else { // 4. The request slot is later than the head slot. // diff --git a/beacon_node/rest_api/tests/test.rs b/beacon_node/rest_api/tests/test.rs index 5b5251672..2cb239dbc 100644 --- a/beacon_node/rest_api/tests/test.rs +++ b/beacon_node/rest_api/tests/test.rs @@ -766,6 +766,7 @@ fn get_genesis_state_root() { .expect("should have beacon chain") .rev_iter_state_roots() .expect("should get iter") + .map(Result::unwrap) .find(|(_cur_root, cur_slot)| slot == *cur_slot) .map(|(cur_root, _)| cur_root) .expect("chain should have state root at slot"); @@ -793,6 +794,7 @@ fn get_genesis_block_root() { .expect("should have beacon chain") .rev_iter_block_roots() .expect("should get iter") + .map(Result::unwrap) .find(|(_cur_root, cur_slot)| slot == *cur_slot) .map(|(cur_root, _)| cur_root) .expect("chain should have state root at slot"); diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 67bd28327..d8459b3c3 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -11,7 +11,8 @@ pub use config::{get_data_dir, get_eth2_testnet_config, get_testnet_dir}; pub use eth2_config::Eth2Config; use beacon_chain::events::TeeEventHandler; -use beacon_chain::migrate::{BackgroundMigrator, HotColdDB}; +use beacon_chain::migrate::BackgroundMigrator; +use beacon_chain::store::LevelDB; use beacon_chain::{ builder::Witness, eth1_chain::CachingEth1Backend, slot_clock::SystemTimeSlotClock, }; @@ -25,12 +26,13 @@ use types::EthSpec; /// A type-alias to the tighten the definition of a production-intended `Client`. pub type ProductionClient = Client< Witness< - HotColdDB, - BackgroundMigrator, + BackgroundMigrator, LevelDB>, SystemTimeSlotClock, - CachingEth1Backend>, + CachingEth1Backend, E, TeeEventHandler, + LevelDB, + LevelDB, >, >; diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 504af770d..23657daa0 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -10,7 +10,6 @@ harness = false [dev-dependencies] tempfile = "3.1.0" -sloggers = "1.0.0" criterion = "0.3.2" rayon = "1.3.0" @@ -31,3 +30,4 @@ lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } lru = "0.5.1" fork_choice = { path = "../../consensus/fork_choice" } +sloggers = "1.0.0" diff --git a/beacon_node/store/src/chunked_iter.rs b/beacon_node/store/src/chunked_iter.rs index 979acd6ea..632b3e017 100644 --- a/beacon_node/store/src/chunked_iter.rs +++ b/beacon_node/store/src/chunked_iter.rs @@ -1,5 +1,5 @@ use crate::chunked_vector::{chunk_key, Chunk, Field}; -use crate::HotColdDB; +use crate::{HotColdDB, ItemStore}; use slog::error; use std::sync::Arc; use types::{ChainSpec, EthSpec, Slot}; @@ -7,22 +7,26 @@ use types::{ChainSpec, EthSpec, Slot}; /// Iterator over the values of a `BeaconState` vector field (like `block_roots`). /// /// Uses the freezer DB's separate table to load the values. -pub struct ChunkedVectorIter +pub struct ChunkedVectorIter where F: Field, E: EthSpec, + Hot: ItemStore, + Cold: ItemStore, { - pub(crate) store: Arc>, + pub(crate) store: Arc>, current_vindex: usize, pub(crate) end_vindex: usize, next_cindex: usize, current_chunk: Chunk, } -impl ChunkedVectorIter +impl ChunkedVectorIter where F: Field, E: EthSpec, + Hot: ItemStore, + Cold: ItemStore, { /// Create a new iterator which can yield elements from `start_vindex` up to the last /// index stored by the restore point at `last_restore_point_slot`. @@ -31,7 +35,7 @@ where /// `HotColdDB::get_latest_restore_point_slot`. We pass it as a parameter so that the caller can /// maintain a stable view of the database (see `HybridForwardsBlockRootsIterator`). pub fn new( - store: Arc>, + store: Arc>, start_vindex: usize, last_restore_point_slot: Slot, spec: &ChainSpec, @@ -53,10 +57,12 @@ where } } -impl Iterator for ChunkedVectorIter +impl Iterator for ChunkedVectorIter where F: Field, E: EthSpec, + Hot: ItemStore, + Cold: ItemStore, { type Item = (usize, F::Value); diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 49c4b5144..4512ed70d 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -3,6 +3,8 @@ use crate::hot_cold_store::HotColdDBError; use ssz::DecodeError; use types::{BeaconStateError, Hash256}; +pub type Result = std::result::Result; + #[derive(Debug)] pub enum Error { SszDecodeError(DecodeError), @@ -13,6 +15,7 @@ pub enum Error { DBError { message: String }, RlpError(String), BlockNotFound(Hash256), + NoContinuationData, } impl From for Error { diff --git a/beacon_node/store/src/forwards_iter.rs b/beacon_node/store/src/forwards_iter.rs index b105608a0..6ae38aa85 100644 --- a/beacon_node/store/src/forwards_iter.rs +++ b/beacon_node/store/src/forwards_iter.rs @@ -1,14 +1,15 @@ use crate::chunked_iter::ChunkedVectorIter; use crate::chunked_vector::BlockRoots; +use crate::errors::{Error, Result}; use crate::iter::BlockRootsIterator; -use crate::{HotColdDB, Store}; -use slog::error; +use crate::{HotColdDB, ItemStore}; +use itertools::process_results; use std::sync::Arc; use types::{BeaconState, ChainSpec, EthSpec, Hash256, Slot}; /// Forwards block roots iterator that makes use of the `block_roots` table in the freezer DB. -pub struct FrozenForwardsBlockRootsIterator { - inner: ChunkedVectorIter, +pub struct FrozenForwardsBlockRootsIterator, Cold: ItemStore> { + inner: ChunkedVectorIter, } /// Forwards block roots iterator that reverses a backwards iterator (only good for short ranges). @@ -18,9 +19,9 @@ pub struct SimpleForwardsBlockRootsIterator { } /// Fusion of the above two approaches to forwards iteration. Fast and efficient. -pub enum HybridForwardsBlockRootsIterator { +pub enum HybridForwardsBlockRootsIterator, Cold: ItemStore> { PreFinalization { - iter: Box>, + iter: Box>, /// Data required by the `PostFinalization` iterator when we get to it. continuation_data: Box, Hash256)>>, }, @@ -29,9 +30,11 @@ pub enum HybridForwardsBlockRootsIterator { }, } -impl FrozenForwardsBlockRootsIterator { +impl, Cold: ItemStore> + FrozenForwardsBlockRootsIterator +{ pub fn new( - store: Arc>, + store: Arc>, start_slot: Slot, last_restore_point_slot: Slot, spec: &ChainSpec, @@ -47,7 +50,9 @@ impl FrozenForwardsBlockRootsIterator { } } -impl Iterator for FrozenForwardsBlockRootsIterator { +impl, Cold: ItemStore> Iterator + for FrozenForwardsBlockRootsIterator +{ type Item = (Hash256, Slot); fn next(&mut self) -> Option { @@ -58,43 +63,49 @@ impl Iterator for FrozenForwardsBlockRootsIterator { } impl SimpleForwardsBlockRootsIterator { - pub fn new, E: EthSpec>( - store: Arc, + pub fn new, Cold: ItemStore>( + store: Arc>, start_slot: Slot, end_state: BeaconState, end_block_root: Hash256, - ) -> Self { + ) -> Result { // Iterate backwards from the end state, stopping at the start slot. - let iter = std::iter::once((end_block_root, end_state.slot)) - .chain(BlockRootsIterator::owned(store, end_state)); - Self { - values: iter.take_while(|(_, slot)| *slot >= start_slot).collect(), - } + let values = process_results( + std::iter::once(Ok((end_block_root, end_state.slot))) + .chain(BlockRootsIterator::owned(store, end_state)), + |iter| { + iter.take_while(|(_, slot)| *slot >= start_slot) + .collect::>() + }, + )?; + Ok(Self { values: values }) } } impl Iterator for SimpleForwardsBlockRootsIterator { - type Item = (Hash256, Slot); + type Item = Result<(Hash256, Slot)>; fn next(&mut self) -> Option { // Pop from the end of the vector to get the block roots in slot-ascending order. - self.values.pop() + Ok(self.values.pop()).transpose() } } -impl HybridForwardsBlockRootsIterator { +impl, Cold: ItemStore> + HybridForwardsBlockRootsIterator +{ pub fn new( - store: Arc>, + store: Arc>, start_slot: Slot, end_state: BeaconState, end_block_root: Hash256, spec: &ChainSpec, - ) -> Self { + ) -> Result { use HybridForwardsBlockRootsIterator::*; let latest_restore_point_slot = store.get_latest_restore_point_slot(); - if start_slot < latest_restore_point_slot { + let result = if start_slot < latest_restore_point_slot { PreFinalization { iter: Box::new(FrozenForwardsBlockRootsIterator::new( store, @@ -111,16 +122,14 @@ impl HybridForwardsBlockRootsIterator { start_slot, end_state, end_block_root, - ), + )?, } - } + }; + + Ok(result) } -} -impl Iterator for HybridForwardsBlockRootsIterator { - type Item = (Hash256, Slot); - - fn next(&mut self) -> Option { + fn do_next(&mut self) -> Result> { use HybridForwardsBlockRootsIterator::*; match self { @@ -129,19 +138,13 @@ impl Iterator for HybridForwardsBlockRootsIterator { continuation_data, } => { match iter.next() { - Some(x) => Some(x), + Some(x) => Ok(Some(x)), // Once the pre-finalization iterator is consumed, transition // to a post-finalization iterator beginning from the last slot // of the pre iterator. None => { let (end_state, end_block_root) = - continuation_data.take().or_else(|| { - error!( - iter.inner.store.log, - "HybridForwardsBlockRootsIterator: logic error" - ); - None - })?; + continuation_data.take().ok_or(Error::NoContinuationData)?; *self = PostFinalization { iter: SimpleForwardsBlockRootsIterator::new( @@ -149,13 +152,23 @@ impl Iterator for HybridForwardsBlockRootsIterator { Slot::from(iter.inner.end_vindex), end_state, end_block_root, - ), + )?, }; - self.next() + self.do_next() } } } - PostFinalization { iter } => iter.next(), + PostFinalization { iter } => iter.next().transpose(), } } } + +impl, Cold: ItemStore> Iterator + for HybridForwardsBlockRootsIterator +{ + type Item = Result<(Hash256, Slot)>; + + fn next(&mut self) -> Option { + self.do_next().transpose() + } +} diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index ec93d9756..8ec6c3453 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -5,10 +5,12 @@ use crate::config::StoreConfig; use crate::forwards_iter::HybridForwardsBlockRootsIterator; use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; +use crate::leveldb_store::LevelDB; +use crate::memory_store::MemoryStore; use crate::metrics; use crate::{ - leveldb_store::LevelDB, DBColumn, Error, ItemStore, KeyValueStore, PartialBeaconState, Store, - StoreItem, StoreOp, + get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, + StoreOp, }; use lru::LruCache; use parking_lot::{Mutex, RwLock}; @@ -32,7 +34,7 @@ pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE"; /// /// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores /// intermittent "restore point" states pre-finalization. -pub struct HotColdDB { +pub struct HotColdDB, Cold: ItemStore> { /// The slot and state root at the point where the database is split between hot and cold. /// /// States with slots less than `split.slot` are in the cold DB, while states with slots @@ -40,11 +42,11 @@ pub struct HotColdDB { split: RwLock, config: StoreConfig, /// Cold database containing compact historical data. - pub(crate) cold_db: LevelDB, + pub(crate) cold_db: Cold, /// Hot database containing duplicated but quick-to-access recent data. /// /// The hot database also contains all blocks. - pub(crate) hot_db: LevelDB, + pub(crate) hot_db: Hot, /// LRU cache of deserialized blocks. Updated whenever a block is loaded. block_cache: Mutex>>, /// Chain spec. @@ -84,11 +86,13 @@ pub enum HotColdDBError { RestorePointBlockHashError(BeaconStateError), } -impl Store for HotColdDB { - type ForwardsBlockRootsIterator = HybridForwardsBlockRootsIterator; - +impl, Cold: ItemStore> HotColdDB { /// Store a block and update the LRU cache. - fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock) -> Result<(), Error> { + pub fn put_block( + &self, + block_root: &Hash256, + block: SignedBeaconBlock, + ) -> Result<(), Error> { // Store on disk. self.hot_db.put(block_root, &block)?; @@ -99,7 +103,7 @@ impl Store for HotColdDB { } /// Fetch a block from the store. - fn get_block(&self, block_root: &Hash256) -> Result>, Error> { + pub fn get_block(&self, block_root: &Hash256) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT); // Check the cache. @@ -120,12 +124,12 @@ impl Store for HotColdDB { } /// Delete a block from the store and the block cache. - fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> { + pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> { self.block_cache.lock().pop(block_root); self.hot_db.delete::>(block_root) } - fn put_state_summary( + pub fn put_state_summary( &self, state_root: &Hash256, summary: HotStateSummary, @@ -134,7 +138,7 @@ impl Store for HotColdDB { } /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { + pub fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { if state.slot < self.get_split_slot() { self.store_cold_state(state_root, &state) } else { @@ -143,7 +147,7 @@ impl Store for HotColdDB { } /// Fetch a state from the store. - fn get_state( + pub fn get_state( &self, state_root: &Hash256, slot: Option, @@ -170,7 +174,7 @@ impl Store for HotColdDB { /// than the split point. You shouldn't delete states from the finalized portion of the chain /// (which are frozen, and won't be deleted), or valid descendents of the finalized checkpoint /// (which will be deleted by this function but shouldn't be). - fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> { + pub fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> { // Delete the state summary. self.hot_db .key_delete(DBColumn::BeaconStateSummary.into(), state_root.as_bytes())?; @@ -184,20 +188,20 @@ impl Store for HotColdDB { Ok(()) } - fn forwards_block_roots_iterator( + pub fn forwards_block_roots_iterator( store: Arc, start_slot: Slot, end_state: BeaconState, end_block_root: Hash256, spec: &ChainSpec, - ) -> Self::ForwardsBlockRootsIterator { + ) -> Result>, Error> { HybridForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root, spec) } /// Load an epoch boundary state by using the hot state summary look-up. /// /// Will fall back to the cold DB if a hot state summary is not found. - fn load_epoch_boundary_state( + pub fn load_epoch_boundary_state( &self, state_root: &Hash256, ) -> Result>, Error> { @@ -226,21 +230,49 @@ impl Store for HotColdDB { } } - fn put_item(&self, key: &Hash256, item: &I) -> Result<(), Error> { + pub fn put_item(&self, key: &Hash256, item: &I) -> Result<(), Error> { self.hot_db.put(key, item) } - fn get_item(&self, key: &Hash256) -> Result, Error> { + pub fn get_item(&self, key: &Hash256) -> Result, Error> { self.hot_db.get(key) } - fn item_exists(&self, key: &Hash256) -> Result { + pub fn item_exists(&self, key: &Hash256) -> Result { self.hot_db.exists::(key) } - fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { + pub fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { let mut guard = self.block_cache.lock(); - self.hot_db.do_atomically(batch)?; + + let mut key_value_batch: Vec = Vec::with_capacity(batch.len()); + for op in batch { + match op { + StoreOp::DeleteBlock(block_hash) => { + let untyped_hash: Hash256 = (*block_hash).into(); + let key = + get_key_for_col(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes()); + key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + } + + StoreOp::DeleteState(state_hash, slot) => { + let untyped_hash: Hash256 = (*state_hash).into(); + let state_summary_key = get_key_for_col( + DBColumn::BeaconStateSummary.into(), + untyped_hash.as_bytes(), + ); + key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key)); + + if *slot % E::slots_per_epoch() == 0 { + let state_key = + get_key_for_col(DBColumn::BeaconState.into(), untyped_hash.as_bytes()); + key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key)); + } + } + } + } + self.hot_db.do_atomically(&key_value_batch)?; + for op in batch { match op { StoreOp::DeleteBlock(block_hash) => { @@ -254,7 +286,30 @@ impl Store for HotColdDB { } } -impl HotColdDB { +impl HotColdDB, MemoryStore> { + pub fn open_ephemeral( + config: StoreConfig, + spec: ChainSpec, + log: Logger, + ) -> Result, MemoryStore>, Error> { + Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; + + let db = HotColdDB { + split: RwLock::new(Split::default()), + cold_db: MemoryStore::open(), + hot_db: MemoryStore::open(), + block_cache: Mutex::new(LruCache::new(config.block_cache_size)), + config, + spec, + log, + _phantom: PhantomData, + }; + + Ok(db) + } +} + +impl HotColdDB, LevelDB> { /// Open a new or existing database, with the given paths to the hot and cold DBs. /// /// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`. @@ -264,7 +319,7 @@ impl HotColdDB { config: StoreConfig, spec: ChainSpec, log: Logger, - ) -> Result { + ) -> Result, LevelDB>, Error> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; let db = HotColdDB { @@ -285,7 +340,9 @@ impl HotColdDB { } Ok(db) } +} +impl, Cold: ItemStore> HotColdDB { /// Store a post-finalization state efficiently in the hot database. /// /// On an epoch boundary, store a full state. On an intermediate slot, store @@ -675,8 +732,8 @@ impl HotColdDB { } /// Advance the split point of the store, moving new finalized states to the freezer. -pub fn process_finalization( - store: Arc>, +pub fn process_finalization, Cold: ItemStore>( + store: Arc>, frozen_head_root: Hash256, frozen_head: &BeaconState, ) -> Result<(), Error> { @@ -708,7 +765,11 @@ pub fn process_finalization( let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head); let mut to_delete = vec![]; - for (state_root, slot) in state_root_iter.take_while(|&(_, slot)| slot >= current_split_slot) { + for maybe_pair in state_root_iter.take_while(|result| match result { + Ok((_, slot)) => slot >= ¤t_split_slot, + Err(_) => true, + }) { + let (state_root, slot) = maybe_pair?; if slot % store.config.slots_per_restore_point == 0 { let state: BeaconState = get_full_state(&store.hot_db, &state_root)? .ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?; diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 58ec20094..5a97a3917 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -1,4 +1,4 @@ -use crate::{Error, Store}; +use crate::{Error, HotColdDB, ItemStore}; use std::borrow::Cow; use std::marker::PhantomData; use std::sync::Arc; @@ -12,17 +12,20 @@ use types::{ /// /// It is assumed that all ancestors for this object are stored in the database. If this is not the /// case, the iterator will start returning `None` prior to genesis. -pub trait AncestorIter, E: EthSpec, I: Iterator> { +pub trait AncestorIter, Cold: ItemStore, I: Iterator> { /// Returns an iterator over the roots of the ancestors of `self`. - fn try_iter_ancestor_roots(&self, store: Arc) -> Option; + fn try_iter_ancestor_roots(&self, store: Arc>) -> Option; } -impl<'a, U: Store, E: EthSpec> AncestorIter> - for SignedBeaconBlock +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> + AncestorIter> for SignedBeaconBlock { /// Iterates across all available prior block roots of `self`, starting at the most recent and ending /// at genesis. - fn try_iter_ancestor_roots(&self, store: Arc) -> Option> { + fn try_iter_ancestor_roots( + &self, + store: Arc>, + ) -> Option> { let state = store .get_state(&self.message.state_root, Some(self.message.slot)) .ok()??; @@ -31,22 +34,27 @@ impl<'a, U: Store, E: EthSpec> AncestorIter, E: EthSpec> AncestorIter> - for BeaconState +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> + AncestorIter> for BeaconState { /// Iterates across all available prior state roots of `self`, starting at the most recent and ending /// at genesis. - fn try_iter_ancestor_roots(&self, store: Arc) -> Option> { + fn try_iter_ancestor_roots( + &self, + store: Arc>, + ) -> Option> { // The `self.clone()` here is wasteful. Some(StateRootsIterator::owned(store, self.clone())) } } -pub struct StateRootsIterator<'a, T: EthSpec, U: Store> { - inner: RootsIterator<'a, T, U>, +pub struct StateRootsIterator<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> { + inner: RootsIterator<'a, T, Hot, Cold>, } -impl<'a, T: EthSpec, U: Store> Clone for StateRootsIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Clone + for StateRootsIterator<'a, T, Hot, Cold> +{ fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -54,27 +62,29 @@ impl<'a, T: EthSpec, U: Store> Clone for StateRootsIterator<'a, T, U> { } } -impl<'a, T: EthSpec, U: Store> StateRootsIterator<'a, T, U> { - pub fn new(store: Arc, beacon_state: &'a BeaconState) -> Self { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> StateRootsIterator<'a, T, Hot, Cold> { + pub fn new(store: Arc>, beacon_state: &'a BeaconState) -> Self { Self { inner: RootsIterator::new(store, beacon_state), } } - pub fn owned(store: Arc, beacon_state: BeaconState) -> Self { + pub fn owned(store: Arc>, beacon_state: BeaconState) -> Self { Self { inner: RootsIterator::owned(store, beacon_state), } } } -impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> { - type Item = (Hash256, Slot); +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator + for StateRootsIterator<'a, T, Hot, Cold> +{ + type Item = Result<(Hash256, Slot), Error>; fn next(&mut self) -> Option { self.inner .next() - .map(|(_, state_root, slot)| (state_root, slot)) + .map(|result| result.map(|(_, state_root, slot)| (state_root, slot))) } } @@ -86,11 +96,13 @@ impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> { /// exhausted. /// /// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. -pub struct BlockRootsIterator<'a, T: EthSpec, U: Store> { - inner: RootsIterator<'a, T, U>, +pub struct BlockRootsIterator<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> { + inner: RootsIterator<'a, T, Hot, Cold>, } -impl<'a, T: EthSpec, U: Store> Clone for BlockRootsIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Clone + for BlockRootsIterator<'a, T, Hot, Cold> +{ fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -98,40 +110,44 @@ impl<'a, T: EthSpec, U: Store> Clone for BlockRootsIterator<'a, T, U> { } } -impl<'a, T: EthSpec, U: Store> BlockRootsIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockRootsIterator<'a, T, Hot, Cold> { /// Create a new iterator over all block roots in the given `beacon_state` and prior states. - pub fn new(store: Arc, beacon_state: &'a BeaconState) -> Self { + pub fn new(store: Arc>, beacon_state: &'a BeaconState) -> Self { Self { inner: RootsIterator::new(store, beacon_state), } } /// Create a new iterator over all block roots in the given `beacon_state` and prior states. - pub fn owned(store: Arc, beacon_state: BeaconState) -> Self { + pub fn owned(store: Arc>, beacon_state: BeaconState) -> Self { Self { inner: RootsIterator::owned(store, beacon_state), } } } -impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { - type Item = (Hash256, Slot); +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator + for BlockRootsIterator<'a, T, Hot, Cold> +{ + type Item = Result<(Hash256, Slot), Error>; fn next(&mut self) -> Option { self.inner .next() - .map(|(block_root, _, slot)| (block_root, slot)) + .map(|result| result.map(|(block_root, _, slot)| (block_root, slot))) } } /// Iterator over state and block roots that backtracks using the vectors from a `BeaconState`. -pub struct RootsIterator<'a, T: EthSpec, U: Store> { - store: Arc, +pub struct RootsIterator<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> { + store: Arc>, beacon_state: Cow<'a, BeaconState>, slot: Slot, } -impl<'a, T: EthSpec, U: Store> Clone for RootsIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Clone + for RootsIterator<'a, T, Hot, Cold> +{ fn clone(&self) -> Self { Self { store: self.store.clone(), @@ -141,8 +157,8 @@ impl<'a, T: EthSpec, U: Store> Clone for RootsIterator<'a, T, U> { } } -impl<'a, T: EthSpec, U: Store> RootsIterator<'a, T, U> { - pub fn new(store: Arc, beacon_state: &'a BeaconState) -> Self { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> RootsIterator<'a, T, Hot, Cold> { + pub fn new(store: Arc>, beacon_state: &'a BeaconState) -> Self { Self { store, slot: beacon_state.slot, @@ -150,7 +166,7 @@ impl<'a, T: EthSpec, U: Store> RootsIterator<'a, T, U> { } } - pub fn owned(store: Arc, beacon_state: BeaconState) -> Self { + pub fn owned(store: Arc>, beacon_state: BeaconState) -> Self { Self { store, slot: beacon_state.slot, @@ -158,7 +174,10 @@ impl<'a, T: EthSpec, U: Store> RootsIterator<'a, T, U> { } } - pub fn from_block(store: Arc, block_hash: Hash256) -> Result { + pub fn from_block( + store: Arc>, + block_hash: Hash256, + ) -> Result { let block = store .get_block(&block_hash)? .ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?; @@ -167,15 +186,10 @@ impl<'a, T: EthSpec, U: Store> RootsIterator<'a, T, U> { .ok_or_else(|| BeaconStateError::MissingBeaconState(block.state_root().into()))?; Ok(Self::owned(store, state)) } -} -impl<'a, T: EthSpec, U: Store> Iterator for RootsIterator<'a, T, U> { - /// (block_root, state_root, slot) - type Item = (Hash256, Hash256, Slot); - - fn next(&mut self) -> Option { + fn do_next(&mut self) -> Result, Error> { if self.slot == 0 || self.slot > self.beacon_state.slot { - return None; + return Ok(None); } self.slot -= 1; @@ -184,7 +198,7 @@ impl<'a, T: EthSpec, U: Store> Iterator for RootsIterator<'a, T, U> { self.beacon_state.get_block_root(self.slot), self.beacon_state.get_state_root(self.slot), ) { - (Ok(block_root), Ok(state_root)) => Some((*block_root, *state_root, self.slot)), + (Ok(block_root), Ok(state_root)) => Ok(Some((*block_root, *state_root, self.slot))), (Err(BeaconStateError::SlotOutOfBounds), Err(BeaconStateError::SlotOutOfBounds)) => { // Read a `BeaconState` from the store that has access to prior historical roots. let beacon_state = @@ -192,25 +206,39 @@ impl<'a, T: EthSpec, U: Store> Iterator for RootsIterator<'a, T, U> { self.beacon_state = Cow::Owned(beacon_state); - let block_root = *self.beacon_state.get_block_root(self.slot).ok()?; - let state_root = *self.beacon_state.get_state_root(self.slot).ok()?; + let block_root = *self.beacon_state.get_block_root(self.slot)?; + let state_root = *self.beacon_state.get_state_root(self.slot)?; - Some((block_root, state_root, self.slot)) + Ok(Some((block_root, state_root, self.slot))) } - _ => None, + (Err(e), _) => Err(e.into()), + (Ok(_), Err(e)) => Err(e.into()), } } } +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator + for RootsIterator<'a, T, Hot, Cold> +{ + /// (block_root, state_root, slot) + type Item = Result<(Hash256, Hash256, Slot), Error>; + + fn next(&mut self) -> Option { + self.do_next().transpose() + } +} + /// Block iterator that uses the `parent_root` of each block to backtrack. -pub struct ParentRootBlockIterator<'a, E: EthSpec, S: Store> { - store: &'a S, +pub struct ParentRootBlockIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { + store: &'a HotColdDB, next_block_root: Hash256, _phantom: PhantomData, } -impl<'a, E: EthSpec, S: Store> ParentRootBlockIterator<'a, E, S> { - pub fn new(store: &'a S, start_block_root: Hash256) -> Self { +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> + ParentRootBlockIterator<'a, E, Hot, Cold> +{ + pub fn new(store: &'a HotColdDB, start_block_root: Hash256) -> Self { Self { store, next_block_root: start_block_root, @@ -235,7 +263,9 @@ impl<'a, E: EthSpec, S: Store> ParentRootBlockIterator<'a, E, S> { } } -impl<'a, E: EthSpec, S: Store> Iterator for ParentRootBlockIterator<'a, E, S> { +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator + for ParentRootBlockIterator<'a, E, Hot, Cold> +{ type Item = Result<(Hash256, SignedBeaconBlock), Error>; fn next(&mut self) -> Option { @@ -245,47 +275,59 @@ impl<'a, E: EthSpec, S: Store> Iterator for ParentRootBlockIterator<'a, E, S> #[derive(Clone)] /// Extends `BlockRootsIterator`, returning `SignedBeaconBlock` instances, instead of their roots. -pub struct BlockIterator<'a, T: EthSpec, U: Store> { - roots: BlockRootsIterator<'a, T, U>, +pub struct BlockIterator<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> { + roots: BlockRootsIterator<'a, T, Hot, Cold>, } -impl<'a, T: EthSpec, U: Store> BlockIterator<'a, T, U> { +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockIterator<'a, T, Hot, Cold> { /// Create a new iterator over all blocks in the given `beacon_state` and prior states. - pub fn new(store: Arc, beacon_state: &'a BeaconState) -> Self { + pub fn new(store: Arc>, beacon_state: &'a BeaconState) -> Self { Self { roots: BlockRootsIterator::new(store, beacon_state), } } /// Create a new iterator over all blocks in the given `beacon_state` and prior states. - pub fn owned(store: Arc, beacon_state: BeaconState) -> Self { + pub fn owned(store: Arc>, beacon_state: BeaconState) -> Self { Self { roots: BlockRootsIterator::owned(store, beacon_state), } } + + fn do_next(&mut self) -> Result>, Error> { + if let Some(result) = self.roots.next() { + let (root, _slot) = result?; + self.roots.inner.store.get_block(&root) + } else { + Ok(None) + } + } } -impl<'a, T: EthSpec, U: Store> Iterator for BlockIterator<'a, T, U> { - type Item = SignedBeaconBlock; +impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator + for BlockIterator<'a, T, Hot, Cold> +{ + type Item = Result, Error>; fn next(&mut self) -> Option { - let (root, _slot) = self.roots.next()?; - self.roots.inner.store.get_block(&root).ok()? + self.do_next().transpose() } } /// Fetch the next state to use whilst backtracking in `*RootsIterator`. -fn next_historical_root_backtrack_state>( - store: &S, +fn next_historical_root_backtrack_state, Cold: ItemStore>( + store: &HotColdDB, current_state: &BeaconState, -) -> Option> { +) -> Result, Error> { // For compatibility with the freezer database's restore points, we load a state at // a restore point slot (thus avoiding replaying blocks). In the case where we're // not frozen, this just means we might not jump back by the maximum amount on // our first jump (i.e. at most 1 extra state load). let new_state_slot = slot_of_prev_restore_point::(current_state.slot); - let new_state_root = current_state.get_state_root(new_state_slot).ok()?; - store.get_state(new_state_root, Some(new_state_slot)).ok()? + let new_state_root = current_state.get_state_root(new_state_slot)?; + Ok(store + .get_state(new_state_root, Some(new_state_slot))? + .ok_or_else(|| BeaconStateError::MissingBeaconState((*new_state_root).into()))?) } /// Compute the slot of the last guaranteed restore point in the freezer database. @@ -297,8 +339,10 @@ fn slot_of_prev_restore_point(current_slot: Slot) -> Slot { #[cfg(test)] mod test { use super::*; - use crate::MemoryStore; - use types::{test_utils::TestingBeaconStateBuilder, Keypair, MainnetEthSpec}; + use crate::config::StoreConfig; + use crate::HotColdDB; + use sloggers::{null::NullLoggerBuilder, Build}; + use types::{test_utils::TestingBeaconStateBuilder, ChainSpec, Keypair, MainnetEthSpec}; fn get_state() -> BeaconState { let builder = TestingBeaconStateBuilder::from_single_keypair( @@ -312,7 +356,10 @@ mod test { #[test] fn block_root_iter() { - let store = Arc::new(MemoryStore::open()); + let log = NullLoggerBuilder.build().unwrap(); + let store = Arc::new( + HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log).unwrap(), + ); let slots_per_historical_root = MainnetEthSpec::slots_per_historical_root(); let mut state_a: BeaconState = get_state(); @@ -337,11 +384,12 @@ mod test { let iter = BlockRootsIterator::new(store, &state_b); assert!( - iter.clone().any(|(_root, slot)| slot == 0), + iter.clone() + .any(|result| result.map(|(_root, slot)| slot == 0).unwrap()), "iter should contain zero slot" ); - let mut collected: Vec<(Hash256, Slot)> = iter.collect(); + let mut collected: Vec<(Hash256, Slot)> = iter.collect::, _>>().unwrap(); collected.reverse(); let expected_len = 2 * MainnetEthSpec::slots_per_historical_root(); @@ -355,7 +403,10 @@ mod test { #[test] fn state_root_iter() { - let store = Arc::new(MemoryStore::open()); + let log = NullLoggerBuilder.build().unwrap(); + let store = Arc::new( + HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log).unwrap(), + ); let slots_per_historical_root = MainnetEthSpec::slots_per_historical_root(); let mut state_a: BeaconState = get_state(); @@ -386,11 +437,12 @@ mod test { let iter = StateRootsIterator::new(store, &state_b); assert!( - iter.clone().any(|(_root, slot)| slot == 0), + iter.clone() + .any(|result| result.map(|(_root, slot)| slot == 0).unwrap()), "iter should contain zero slot" ); - let mut collected: Vec<(Hash256, Slot)> = iter.collect(); + let mut collected: Vec<(Hash256, Slot)> = iter.collect::, _>>().unwrap(); collected.reverse(); let expected_len = MainnetEthSpec::slots_per_historical_root() * 2; diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 3a7d55889..6e3dc8bbe 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -48,7 +48,7 @@ impl KeyValueStore for LevelDB { let timer = metrics::start_timer(&metrics::DISK_DB_READ_TIMES); self.db - .get(self.read_options(), column_key) + .get(self.read_options(), BytesKey::from_vec(column_key)) .map_err(Into::into) .map(|opt| { opt.map(|bytes| { @@ -68,7 +68,7 @@ impl KeyValueStore for LevelDB { let timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES); self.db - .put(self.write_options(), column_key, val) + .put(self.write_options(), BytesKey::from_vec(column_key), val) .map_err(Into::into) .map(|()| { metrics::stop_timer(timer); @@ -82,7 +82,7 @@ impl KeyValueStore for LevelDB { metrics::inc_counter(&metrics::DISK_DB_EXISTS_COUNT); self.db - .get(self.read_options(), column_key) + .get(self.read_options(), BytesKey::from_vec(column_key)) .map_err(Into::into) .and_then(|val| Ok(val.is_some())) } @@ -94,34 +94,16 @@ impl KeyValueStore for LevelDB { metrics::inc_counter(&metrics::DISK_DB_DELETE_COUNT); self.db - .delete(self.write_options(), column_key) + .delete(self.write_options(), BytesKey::from_vec(column_key)) .map_err(Into::into) } - fn do_atomically(&self, ops_batch: &[StoreOp]) -> Result<(), Error> { + fn do_atomically(&self, ops_batch: &[KeyValueStoreOp]) -> Result<(), Error> { let mut leveldb_batch = Writebatch::new(); - for op in ops_batch { + for op in ops_batch.into_iter() { match op { - StoreOp::DeleteBlock(block_hash) => { - let untyped_hash: Hash256 = (*block_hash).into(); - let key = - get_key_for_col(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes()); - leveldb_batch.delete(key); - } - - StoreOp::DeleteState(state_hash, slot) => { - let untyped_hash: Hash256 = (*state_hash).into(); - let state_summary_key = get_key_for_col( - DBColumn::BeaconStateSummary.into(), - untyped_hash.as_bytes(), - ); - leveldb_batch.delete(state_summary_key); - - if *slot % E::slots_per_epoch() == 0 { - let state_key = - get_key_for_col(DBColumn::BeaconState.into(), untyped_hash.as_bytes()); - leveldb_batch.delete(state_key); - } + KeyValueStoreOp::DeleteKey(key) => { + leveldb_batch.delete(BytesKey::from_vec(key.to_vec())); } } } @@ -147,10 +129,10 @@ impl Key for BytesKey { } } -fn get_key_for_col(col: &str, key: &[u8]) -> BytesKey { - let mut col = col.as_bytes().to_vec(); - col.append(&mut key.to_vec()); - BytesKey { key: col } +impl BytesKey { + fn from_vec(key: Vec) -> Self { + Self { key } + } } impl From for Error { diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 55489679e..1d0506ce3 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -13,7 +13,7 @@ extern crate lazy_static; pub mod chunked_iter; pub mod chunked_vector; pub mod config; -mod errors; +pub mod errors; mod forwards_iter; pub mod hot_cold_store; mod impls; @@ -25,8 +25,6 @@ mod state_batch; pub mod iter; -use std::sync::Arc; - pub use self::config::StoreConfig; pub use self::hot_cold_store::{HotColdDB, HotStateSummary}; pub use self::leveldb_store::LevelDB; @@ -52,7 +50,17 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error>; /// Execute either all of the operations in `batch` or none at all, returning an error. - fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error>; + fn do_atomically(&self, batch: &[KeyValueStoreOp]) -> Result<(), Error>; +} + +pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { + let mut result = column.as_bytes().to_vec(); + result.extend_from_slice(key); + result +} + +pub enum KeyValueStoreOp { + DeleteKey(Vec), } pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'static { @@ -93,75 +101,6 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati } } -/// An object capable of storing and retrieving objects implementing `StoreItem`. -/// -/// A `Store` is fundamentally backed by a key-value database, however it provides support for -/// columns. A simple column implementation might involve prefixing a key with some bytes unique to -/// each column. -pub trait Store: Sync + Send + Sized + 'static { - type ForwardsBlockRootsIterator: Iterator; - - /// Store a block in the store. - fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock) -> Result<(), Error>; - - /// Fetch a block from the store. - fn get_block(&self, block_root: &Hash256) -> Result>, Error>; - - /// Delete a block from the store. - fn delete_block(&self, block_root: &Hash256) -> Result<(), Error>; - - /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error>; - - /// Store a state summary in the store. - fn put_state_summary( - &self, - state_root: &Hash256, - summary: HotStateSummary, - ) -> Result<(), Error>; - - /// Fetch a state from the store. - fn get_state( - &self, - state_root: &Hash256, - slot: Option, - ) -> Result>, Error>; - - /// Delete a state from the store. - fn delete_state(&self, state_root: &Hash256, _slot: Slot) -> Result<(), Error>; - - /// Get a forwards (slot-ascending) iterator over the beacon block roots since `start_slot`. - /// - /// Will be efficient for frozen portions of the database if using `HotColdDB`. - /// - /// The `end_state` and `end_block_root` are required for backtracking in the post-finalization - /// part of the chain, and should be usually be set to the current head. Importantly, the - /// `end_state` must be a state that has had a block applied to it, and the hash of that - /// block must be `end_block_root`. - // NOTE: could maybe optimise by getting the `BeaconState` and end block root from a closure, as - // it's not always required. - fn forwards_block_roots_iterator( - store: Arc, - start_slot: Slot, - end_state: BeaconState, - end_block_root: Hash256, - spec: &ChainSpec, - ) -> Self::ForwardsBlockRootsIterator; - - fn load_epoch_boundary_state( - &self, - state_root: &Hash256, - ) -> Result>, Error>; - - fn put_item(&self, key: &Hash256, item: &I) -> Result<(), Error>; - - fn get_item(&self, key: &Hash256) -> Result, Error>; - - fn item_exists(&self, key: &Hash256) -> Result; - - fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error>; -} - /// Reified key-value storage operation. Helps in modifying the storage atomically. /// See also https://github.com/sigp/lighthouse/issues/692 pub enum StoreOp { diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 45918e74e..c8556860c 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,12 +1,7 @@ -use super::{DBColumn, Error, ItemStore, KeyValueStore, Store, StoreOp}; -use crate::forwards_iter::SimpleForwardsBlockRootsIterator; -use crate::hot_cold_store::HotStateSummary; -use crate::impls::beacon_state::{get_full_state, store_full_state}; -use crate::StoreItem; +use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp}; use parking_lot::RwLock; use std::collections::HashMap; use std::marker::PhantomData; -use std::sync::Arc; use types::*; type DBHashMap = HashMap, Vec>; @@ -46,53 +41,34 @@ impl KeyValueStore for MemoryStore { /// Get the value of some key from the database. Returns `None` if the key does not exist. fn get_bytes(&self, col: &str, key: &[u8]) -> Result>, Error> { let column_key = Self::get_key_for_col(col, key); - Ok(self.db.read().get(&column_key).cloned()) } /// Puts a key in the database. fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { let column_key = Self::get_key_for_col(col, key); - self.db.write().insert(column_key, val.to_vec()); - Ok(()) } /// Return true if some key exists in some column. fn key_exists(&self, col: &str, key: &[u8]) -> Result { let column_key = Self::get_key_for_col(col, key); - Ok(self.db.read().contains_key(&column_key)) } /// Delete some key from the database. fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> { let column_key = Self::get_key_for_col(col, key); - self.db.write().remove(&column_key); - Ok(()) } - fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { + fn do_atomically(&self, batch: &[KeyValueStoreOp]) -> Result<(), Error> { for op in batch { match op { - StoreOp::DeleteBlock(block_hash) => { - let untyped_hash: Hash256 = (*block_hash).into(); - self.key_delete(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes())?; - } - - StoreOp::DeleteState(state_hash, slot) => { - let untyped_hash: Hash256 = (*state_hash).into(); - if *slot % E::slots_per_epoch() == 0 { - self.key_delete(DBColumn::BeaconState.into(), untyped_hash.as_bytes())?; - } else { - self.key_delete( - DBColumn::BeaconStateSummary.into(), - untyped_hash.as_bytes(), - )?; - } + KeyValueStoreOp::DeleteKey(hash) => { + self.db.write().remove(hash); } } } @@ -101,94 +77,3 @@ impl KeyValueStore for MemoryStore { } impl ItemStore for MemoryStore {} - -impl Store for MemoryStore { - type ForwardsBlockRootsIterator = SimpleForwardsBlockRootsIterator; - - fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock) -> Result<(), Error> { - self.put(block_root, &block) - } - - fn get_block(&self, block_root: &Hash256) -> Result>, Error> { - self.get(block_root) - } - - fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> { - self.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes()) - } - - fn put_state_summary( - &self, - state_root: &Hash256, - summary: HotStateSummary, - ) -> Result<(), Error> { - self.put(state_root, &summary).map_err(Into::into) - } - - /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { - store_full_state(self, state_root, &state) - } - - /// Fetch a state from the store. - fn get_state( - &self, - state_root: &Hash256, - _: Option, - ) -> Result>, Error> { - get_full_state(self, state_root) - } - - fn delete_state(&self, state_root: &Hash256, _slot: Slot) -> Result<(), Error> { - self.key_delete(DBColumn::BeaconState.into(), state_root.as_bytes()) - } - - fn forwards_block_roots_iterator( - store: Arc, - start_slot: Slot, - end_state: BeaconState, - end_block_root: Hash256, - _: &ChainSpec, - ) -> Self::ForwardsBlockRootsIterator { - SimpleForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root) - } - - /// Load the most recent ancestor state of `state_root` which lies on an epoch boundary. - /// - /// If `state_root` corresponds to an epoch boundary state, then that state itself should be - /// returned. - fn load_epoch_boundary_state( - &self, - state_root: &Hash256, - ) -> Result>, Error> { - // The default implementation is not very efficient, but isn't used in prod. - // See `HotColdDB` for the optimized implementation. - if let Some(state) = self.get_state(state_root, None)? { - let epoch_boundary_slot = state.slot / E::slots_per_epoch() * E::slots_per_epoch(); - if state.slot == epoch_boundary_slot { - Ok(Some(state)) - } else { - let epoch_boundary_state_root = state.get_state_root(epoch_boundary_slot)?; - self.get_state(epoch_boundary_state_root, Some(epoch_boundary_slot)) - } - } else { - Ok(None) - } - } - - fn put_item(&self, key: &Hash256, item: &I) -> Result<(), Error> { - self.put(key, item) - } - - fn get_item(&self, key: &Hash256) -> Result, Error> { - self.get(key) - } - - fn item_exists(&self, key: &Hash256) -> Result { - self.exists::(key) - } - - fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { - KeyValueStore::do_atomically(self, batch) - } -} diff --git a/beacon_node/store/src/state_batch.rs b/beacon_node/store/src/state_batch.rs index c19173861..ab2d060c1 100644 --- a/beacon_node/store/src/state_batch.rs +++ b/beacon_node/store/src/state_batch.rs @@ -1,4 +1,4 @@ -use crate::{Error, HotStateSummary, Store}; +use crate::{Error, HotColdDB, HotStateSummary, ItemStore}; use types::{BeaconState, EthSpec, Hash256}; /// A collection of states to be stored in the database. @@ -36,7 +36,10 @@ impl StateBatch { /// Write the batch to the database. /// /// May fail to write the full batch if any of the items error (i.e. not atomic!) - pub fn commit>(self, store: &S) -> Result<(), Error> { + pub fn commit, Cold: ItemStore>( + self, + store: &HotColdDB, + ) -> Result<(), Error> { self.items.into_iter().try_for_each(|item| match item { BatchItem::Full(state_root, state) => store.put_state(&state_root, &state), BatchItem::Summary(state_root, summary) => { diff --git a/book/src/SUMMARY.md b/book/src/SUMMARY.md index 721978f91..991589ad7 100644 --- a/book/src/SUMMARY.md +++ b/book/src/SUMMARY.md @@ -6,6 +6,7 @@ * [Building from Source](./become-a-validator-source.md) * [Installation](./installation.md) * [Docker](./docker.md) + * [Raspberry Pi 4](./pi.md) * [Key Management](./key-managment.md) * [Create a wallet](./wallet-create.md) * [Create a validator](./validator-create.md) diff --git a/book/src/installation.md b/book/src/installation.md index 59fc0652c..4100d47f5 100644 --- a/book/src/installation.md +++ b/book/src/installation.md @@ -26,7 +26,16 @@ If this doesn't work or is not clear enough, see the [Detailed Instructions](#de ## Troubleshooting -### Command is not found +### Dependencies (Ubuntu) + +Several dependencies may be required to compile Lighthouse. The following +packages may be required in addition a base Ubuntu Server installation: + +```bash +sudo apt install -y git gcc g++ make cmake pkg-config libssl-dev +``` + +### Command is not found Lighthouse will be installed to `CARGO_HOME` or `$HOME/.cargo`. This directory needs to be on your `PATH` before you can run `$ lighthouse`. diff --git a/book/src/pi.md b/book/src/pi.md new file mode 100644 index 000000000..c080fd0da --- /dev/null +++ b/book/src/pi.md @@ -0,0 +1,53 @@ +# Raspberry Pi 4 Installation + +Tested on: + + - Raspberry Pi 4 Model B (4GB) + - `Ubuntu 20.04 LTS (GNU/Linux 5.4.0-1011-raspi aarch64)` + +### 1. Install Ubuntu + +Follow the [Ubuntu Raspberry Pi installation instructions](https://ubuntu.com/download/raspberry-pi). + +**A 64-bit version is required** and latest version is recommended (Ubuntu +20.04 LTS was the latest at the time of writing). + +A graphical environment is not required in order to use Lighthouse. Only the +terminal and an Internet connection are necessary. + +### 2. Install Packages + +Install the [Ubuntu Dependencies](installation.md#dependencies-ubuntu). +(I.e., run the `sudo apt install ...` command at that link). + +> Tips: +> +> - If there are difficulties, try updating the package manager with `sudo apt +> update`. + +### 3. Install Rust + +Install Rust as per [rustup](https://rustup.rs/). (I.e., run the `curl ... ` +command). + +> Tips: +> +> - When prompted, enter `1` for the default installation. +> - Try running `cargo version` after Rust installation completes. If it cannot +> be found, run `source $HOME/.cargo/env`. +> - It's generally advised to append `source $HOME/.cargo/env` to `~/.bashrc`. + +### 4. Install Lighthouse + +```bash +git clone https://github.com/sigp/lighthouse.git +cd lighthouse +make +``` + +Compiling Lighthouse can take up to an hour. The safety guarantees provided by +the Rust language unfortunately result in a lengthy compilation time on a +low-spec CPU like a Raspberry Pi. + +Once installation has finished, confirm Lighthouse is installed by viewing the +usage instructions with `lighthouse --help`. diff --git a/common/compare_fields/Cargo.toml b/common/compare_fields/Cargo.toml index ee5fe5c23..dc07f940b 100644 --- a/common/compare_fields/Cargo.toml +++ b/common/compare_fields/Cargo.toml @@ -7,4 +7,5 @@ edition = "2018" [dev-dependencies] compare_fields_derive = { path = "../compare_fields_derive" } -[dependencies] +[package.metadata.cargo-udeps.ignore] +development = ["compare_fields_derive"] # used in doc-tests diff --git a/common/validator_dir/Cargo.toml b/common/validator_dir/Cargo.toml index 235a16938..6ca3585b0 100644 --- a/common/validator_dir/Cargo.toml +++ b/common/validator_dir/Cargo.toml @@ -11,7 +11,6 @@ insecure_keys = [] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -eth2_wallet = { path = "../../crypto/eth2_wallet" } bls = { path = "../../crypto/bls" } eth2_keystore = { path = "../../crypto/eth2_keystore" } types = { path = "../../consensus/types" } diff --git a/consensus/proto_array/Cargo.toml b/consensus/proto_array/Cargo.toml index e618d755e..63f2d7fd7 100644 --- a/consensus/proto_array/Cargo.toml +++ b/consensus/proto_array/Cargo.toml @@ -10,7 +10,6 @@ path = "src/bin.rs" [dependencies] types = { path = "../types" } -itertools = "0.9.0" eth2_ssz = "0.1.2" eth2_ssz_derive = "0.1.0" serde = "1.0.110" diff --git a/consensus/ssz_types/Cargo.toml b/consensus/ssz_types/Cargo.toml index 9fc841c22..144b3ce31 100644 --- a/consensus/ssz_types/Cargo.toml +++ b/consensus/ssz_types/Cargo.toml @@ -17,5 +17,4 @@ typenum = "1.12.0" arbitrary = { version = "0.4.4", features = ["derive"], optional = true } [dev-dependencies] -serde_yaml = "0.8.11" tree_hash_derive = "0.2.0" diff --git a/consensus/state_processing/Cargo.toml b/consensus/state_processing/Cargo.toml index 297af4897..bae0705c8 100644 --- a/consensus/state_processing/Cargo.toml +++ b/consensus/state_processing/Cargo.toml @@ -15,8 +15,6 @@ serde = "1.0.110" serde_derive = "1.0.110" lazy_static = "1.4.0" serde_yaml = "0.8.11" -beacon_chain = { path = "../../beacon_node/beacon_chain" } -store = { path = "../../beacon_node/store" } [dependencies] bls = { path = "../../crypto/bls" } diff --git a/consensus/swap_or_not_shuffle/Cargo.toml b/consensus/swap_or_not_shuffle/Cargo.toml index da2b72664..12af74aff 100644 --- a/consensus/swap_or_not_shuffle/Cargo.toml +++ b/consensus/swap_or_not_shuffle/Cargo.toml @@ -10,8 +10,6 @@ harness = false [dev-dependencies] criterion = "0.3.2" -yaml-rust = "0.4.3" -hex = "0.4.2" [dependencies] eth2_hashing = "0.1.0" diff --git a/consensus/types/Cargo.toml b/consensus/types/Cargo.toml index c9cab4126..3bffbe3a4 100644 --- a/consensus/types/Cargo.toml +++ b/consensus/types/Cargo.toml @@ -13,7 +13,6 @@ bls = { path = "../../crypto/bls" } compare_fields = { path = "../../common/compare_fields" } compare_fields_derive = { path = "../../common/compare_fields_derive" } dirs = "2.0.2" -derivative = "2.1.1" eth2_interop_keypairs = { path = "../../common/eth2_interop_keypairs" } ethereum-types = "0.9.1" eth2_hashing = "0.1.0" @@ -42,7 +41,6 @@ rusqlite = { version = "0.23.1", features = ["bundled"], optional = true } arbitrary = { version = "0.4.4", features = ["derive"], optional = true } [dev-dependencies] -env_logger = "0.7.1" serde_json = "1.0.52" criterion = "0.3.2" diff --git a/crypto/bls/Cargo.toml b/crypto/bls/Cargo.toml index ff894a7e0..0f4e9cfe1 100644 --- a/crypto/bls/Cargo.toml +++ b/crypto/bls/Cargo.toml @@ -13,7 +13,6 @@ serde = "1.0.110" serde_derive = "1.0.110" serde_hex = { path = "../../consensus/serde_hex" } eth2_ssz = "0.1.2" -eth2_ssz_types = { path = "../../consensus/ssz_types" } tree_hash = "0.1.0" arbitrary = { version = "0.4.4", features = ["derive"], optional = true } zeroize = { version = "1.0.0", features = ["zeroize_derive"] } diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index afd8d1f38..a87cf88e7 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -18,7 +18,6 @@ types = { path = "../consensus/types" } state_processing = { path = "../consensus/state_processing" } eth2_ssz = "0.1.2" regex = "1.3.7" -eth1_test_rig = { path = "../testing/eth1_test_rig" } futures = { version = "0.3.5", features = ["compat"] } environment = { path = "../lighthouse/environment" } web3 = "0.11.0" diff --git a/lighthouse/environment/Cargo.toml b/lighthouse/environment/Cargo.toml index bd9a1b981..7d36f7f98 100644 --- a/lighthouse/environment/Cargo.toml +++ b/lighthouse/environment/Cargo.toml @@ -5,14 +5,12 @@ authors = ["Paul Hauner "] edition = "2018" [dependencies] -clap = "2.33.0" tokio = "0.2.21" slog = { version = "2.5.2", features = ["max_level_trace"] } sloggers = "1.0.0" types = { "path" = "../../consensus/types" } eth2_config = { "path" = "../../common/eth2_config" } eth2_testnet_config = { path = "../../common/eth2_testnet_config" } -env_logger = "0.7.1" logging = { path = "../../common/logging" } slog-term = "2.5.0" slog-async = "2.5.0" @@ -23,6 +21,3 @@ slog-json = "2.3.0" exit-future = "0.2.0" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } - -[dev-dependencies] -beacon_node = { path = "../../beacon_node" } diff --git a/testing/ef_tests/Cargo.toml b/testing/ef_tests/Cargo.toml index aef1a2222..2dc48a297 100644 --- a/testing/ef_tests/Cargo.toml +++ b/testing/ef_tests/Cargo.toml @@ -27,4 +27,3 @@ cached_tree_hash = { path = "../../consensus/cached_tree_hash" } state_processing = { path = "../../consensus/state_processing" } swap_or_not_shuffle = { path = "../../consensus/swap_or_not_shuffle" } types = { path = "../../consensus/types" } -walkdir = "2.3.1" diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index ea0c99c62..19e0626a3 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -28,8 +28,6 @@ slog = { version = "2.5.2", features = ["max_level_trace", "release_max_level_tr slog-async = "2.5.0" slog-term = "2.5.0" tokio = { version = "0.2.21", features = ["time"] } -error-chain = "0.12.2" -bincode = "1.2.1" futures = { version = "0.3.5", features = ["compat"] } dirs = "2.0.2" logging = { path = "../common/logging" } @@ -44,6 +42,5 @@ bls = { path = "../crypto/bls" } remote_beacon_node = { path = "../common/remote_beacon_node" } tempdir = "0.3.7" rayon = "1.3.0" -web3 = "0.11.0" validator_dir = { path = "../common/validator_dir" } clap_utils = { path = "../common/clap_utils" } diff --git a/validator_client/slashing_protection/Cargo.toml b/validator_client/slashing_protection/Cargo.toml index 86e1fb458..7ce29c80d 100644 --- a/validator_client/slashing_protection/Cargo.toml +++ b/validator_client/slashing_protection/Cargo.toml @@ -11,7 +11,7 @@ tree_hash = { path = "../../consensus/tree_hash" } rusqlite = { version = "0.23.1", features = ["bundled"] } r2d2 = "0.8.8" r2d2_sqlite = "0.16.0" -parking_lot = "0.9.0" +parking_lot = "0.10.2" [dev-dependencies] rayon = "1.3.0" diff --git a/validator_client/src/validator_directory.rs b/validator_client/src/validator_directory.rs deleted file mode 100644 index a7abd4ec6..000000000 --- a/validator_client/src/validator_directory.rs +++ /dev/null @@ -1,454 +0,0 @@ -use bls::get_withdrawal_credentials; -use deposit_contract::{encode_eth1_tx_data, DEPOSIT_GAS}; -use futures::compat::Future01CompatExt; -use hex; -use ssz::{Decode, Encode}; -use ssz_derive::{Decode, Encode}; -use std::fs; -use std::fs::File; -use std::io::prelude::*; -use std::os::unix::fs::PermissionsExt; -use std::path::PathBuf; -use types::{ - test_utils::generate_deterministic_keypair, ChainSpec, DepositData, Hash256, Keypair, - PublicKey, SecretKey, Signature, -}; -use web3::{ - types::{Address, TransactionRequest, U256}, - Transport, Web3, -}; - -const VOTING_KEY_PREFIX: &str = "voting"; -const WITHDRAWAL_KEY_PREFIX: &str = "withdrawal"; -const ETH1_DEPOSIT_DATA_FILE: &str = "eth1_deposit_data.rlp"; - -/// Returns the filename of a keypair file. -fn keypair_file(prefix: &str) -> String { - format!("{}_keypair", prefix) -} - -/// Returns the name of the folder to be generated for a validator with the given voting key. -fn dir_name(voting_pubkey: &PublicKey) -> String { - format!("0x{}", hex::encode(voting_pubkey.as_ssz_bytes())) -} - -/// Represents the files/objects for each dedicated lighthouse validator directory. -/// -/// Generally lives in `~/.lighthouse/validators/`. -#[derive(Debug, PartialEq, Clone)] -pub struct ValidatorDirectory { - pub directory: PathBuf, - pub voting_keypair: Option, - pub withdrawal_keypair: Option, - pub deposit_data: Option>, -} - -impl ValidatorDirectory { - /// Attempts to load a validator from the given directory, requiring only components necessary - /// for signing messages. - pub fn load_for_signing(directory: PathBuf) -> Result { - if !directory.exists() { - return Err(format!( - "Validator directory does not exist: {:?}", - directory - )); - } - - Ok(Self { - voting_keypair: Some( - load_keypair(directory.clone(), VOTING_KEY_PREFIX) - .map_err(|e| format!("Unable to get voting keypair: {}", e))?, - ), - withdrawal_keypair: load_keypair(directory.clone(), WITHDRAWAL_KEY_PREFIX).ok(), - deposit_data: load_eth1_deposit_data(directory.clone()).ok(), - directory, - }) - } -} - -/// Load a `Keypair` from a file. -fn load_keypair(base_path: PathBuf, file_prefix: &str) -> Result { - let path = base_path.join(keypair_file(file_prefix)); - - if !path.exists() { - return Err(format!("Keypair file does not exist: {:?}", path)); - } - - let mut bytes = vec![]; - - File::open(&path) - .map_err(|e| format!("Unable to open keypair file: {}", e))? - .read_to_end(&mut bytes) - .map_err(|e| format!("Unable to read keypair file: {}", e))?; - - SszEncodableKeypair::from_ssz_bytes(&bytes) - .map(Into::into) - .map_err(|e| format!("Unable to decode keypair: {:?}", e)) -} - -/// Load eth1_deposit_data from file. -fn load_eth1_deposit_data(base_path: PathBuf) -> Result, String> { - let path = base_path.join(ETH1_DEPOSIT_DATA_FILE); - - if !path.exists() { - return Err(format!("Eth1 deposit data file does not exist: {:?}", path)); - } - - let mut bytes = vec![]; - - File::open(&path) - .map_err(|e| format!("Unable to open eth1 deposit data file: {}", e))? - .read_to_end(&mut bytes) - .map_err(|e| format!("Unable to read eth1 deposit data file: {}", e))?; - - let string = String::from_utf8_lossy(&bytes); - if string.starts_with("0x") { - hex::decode(&string[2..]) - .map_err(|e| format!("Unable to decode eth1 data file as hex: {}", e)) - } else { - Err(format!("String did not start with 0x: {}", string)) - } -} - -/// A helper struct to allow SSZ enc/dec for a `Keypair`. -#[derive(Encode, Decode)] -struct SszEncodableKeypair { - pk: PublicKey, - sk: SecretKey, -} - -impl Into for SszEncodableKeypair { - fn into(self) -> Keypair { - Keypair { - sk: self.sk, - pk: self.pk, - } - } -} - -impl From for SszEncodableKeypair { - fn from(kp: Keypair) -> Self { - Self { - sk: kp.sk, - pk: kp.pk, - } - } -} - -/// Builds a `ValidatorDirectory`, both in-memory and on-disk. -#[derive(Default)] -pub struct ValidatorDirectoryBuilder { - directory: Option, - voting_keypair: Option, - withdrawal_keypair: Option, - amount: Option, - deposit_data: Option>, - spec: Option, -} - -impl ValidatorDirectoryBuilder { - /// Set the specification for this validator. - pub fn spec(mut self, spec: ChainSpec) -> Self { - self.spec = Some(spec); - self - } - - /// Use the `MAX_EFFECTIVE_BALANCE` as this validator's deposit. - pub fn full_deposit_amount(mut self) -> Result { - let spec = self - .spec - .as_ref() - .ok_or_else(|| "full_deposit_amount requires a spec")?; - self.amount = Some(spec.max_effective_balance); - Ok(self) - } - - /// Use a validator deposit of `gwei`. - pub fn custom_deposit_amount(mut self, gwei: u64) -> Self { - self.amount = Some(gwei); - self - } - - /// Generate keypairs using `Keypair::random()`. - pub fn thread_random_keypairs(mut self) -> Self { - self.voting_keypair = Some(Keypair::random()); - self.withdrawal_keypair = Some(Keypair::random()); - self - } - - /// Generate insecure, deterministic keypairs. - /// - /// - /// ## Warning - /// Only for use in testing. Do not store value in these keys. - pub fn insecure_keypairs(mut self, index: usize) -> Self { - let keypair = generate_deterministic_keypair(index); - self.voting_keypair = Some(keypair.clone()); - self.withdrawal_keypair = Some(keypair); - self - } - - /// Creates a validator directory in the given `base_path` (e.g., `~/.lighthouse/validators/`). - pub fn create_directory(mut self, base_path: PathBuf) -> Result { - let voting_keypair = self - .voting_keypair - .as_ref() - .ok_or_else(|| "directory requires a voting_keypair")?; - - let directory = base_path.join(dir_name(&voting_keypair.pk)); - - if directory.exists() { - return Err(format!( - "Validator directory already exists: {:?}", - directory - )); - } - - fs::create_dir_all(&directory) - .map_err(|e| format!("Unable to create validator directory: {}", e))?; - - self.directory = Some(directory); - - Ok(self) - } - - /// Write the validators keypairs to disk. - pub fn write_keypair_files(self) -> Result { - let voting_keypair = self - .voting_keypair - .clone() - .ok_or_else(|| "write_keypair_files requires a voting_keypair")?; - let withdrawal_keypair = self - .withdrawal_keypair - .clone() - .ok_or_else(|| "write_keypair_files requires a withdrawal_keypair")?; - - self.save_keypair(voting_keypair, VOTING_KEY_PREFIX)?; - self.save_keypair(withdrawal_keypair, WITHDRAWAL_KEY_PREFIX)?; - Ok(self) - } - - fn save_keypair(&self, keypair: Keypair, file_prefix: &str) -> Result<(), String> { - let path = self - .directory - .as_ref() - .map(|directory| directory.join(keypair_file(file_prefix))) - .ok_or_else(|| "save_keypair requires a directory")?; - - if path.exists() { - return Err(format!("Keypair file already exists at: {:?}", path)); - } - - let mut file = File::create(&path).map_err(|e| format!("Unable to create file: {}", e))?; - - // Ensure file has correct permissions. - let mut perm = file - .metadata() - .map_err(|e| format!("Unable to get file metadata: {}", e))? - .permissions(); - perm.set_mode((libc::S_IWUSR | libc::S_IRUSR) as u32); - file.set_permissions(perm) - .map_err(|e| format!("Unable to set file permissions: {}", e))?; - - file.write_all(&SszEncodableKeypair::from(keypair).as_ssz_bytes()) - .map_err(|e| format!("Unable to write keypair to file: {}", e))?; - - Ok(()) - } - - fn get_deposit_data(&self) -> Result<(Vec, u64), String> { - let voting_keypair = self - .voting_keypair - .as_ref() - .ok_or_else(|| "write_eth1_data_file requires a voting_keypair")?; - let withdrawal_keypair = self - .withdrawal_keypair - .as_ref() - .ok_or_else(|| "write_eth1_data_file requires a withdrawal_keypair")?; - let amount = self - .amount - .ok_or_else(|| "write_eth1_data_file requires an amount")?; - let spec = self.spec.as_ref().ok_or_else(|| "build requires a spec")?; - - let withdrawal_credentials = Hash256::from_slice(&get_withdrawal_credentials( - &withdrawal_keypair.pk, - spec.bls_withdrawal_prefix_byte, - )); - - let mut deposit_data = DepositData { - pubkey: voting_keypair.pk.clone().into(), - withdrawal_credentials, - amount, - signature: Signature::empty_signature().into(), - }; - - deposit_data.signature = deposit_data.create_signature(&voting_keypair.sk, &spec); - - let deposit_data = encode_eth1_tx_data(&deposit_data) - .map_err(|e| format!("Unable to encode eth1 deposit tx data: {:?}", e))?; - - Ok((deposit_data, amount)) - } - - pub fn write_eth1_data_file(mut self) -> Result { - let path = self - .directory - .as_ref() - .map(|directory| directory.join(ETH1_DEPOSIT_DATA_FILE)) - .ok_or_else(|| "write_eth1_data_file requires a directory")?; - - let (deposit_data, _) = self.get_deposit_data()?; - - if path.exists() { - return Err(format!("Eth1 data file already exists at: {:?}", path)); - } - - File::create(&path) - .map_err(|e| format!("Unable to create file: {}", e))? - .write_all(&format!("0x{}", hex::encode(&deposit_data)).as_bytes()) - .map_err(|e| format!("Unable to write eth1 data file: {}", e))?; - - self.deposit_data = Some(deposit_data); - - Ok(self) - } - - pub async fn submit_eth1_deposit( - self, - web3: Web3, - from: Address, - deposit_contract: Address, - ) -> Result<(Self, Hash256), String> { - let (deposit_data, deposit_amount) = self.get_deposit_data()?; - web3.eth() - .send_transaction(TransactionRequest { - from, - to: Some(deposit_contract), - gas: Some(DEPOSIT_GAS.into()), - gas_price: None, - value: Some(from_gwei(deposit_amount)), - data: Some(deposit_data.into()), - nonce: None, - condition: None, - }) - .compat() - .await - .map_err(|e| format!("Failed to send transaction: {:?}", e)) - .map(|tx| (self, tx)) - } - - pub fn build(self) -> Result { - let directory = self.directory.ok_or_else(|| "build requires a directory")?; - - Ok(ValidatorDirectory { - directory, - voting_keypair: self.voting_keypair, - withdrawal_keypair: self.withdrawal_keypair, - deposit_data: self.deposit_data, - }) - } -} - -/// Converts gwei to wei. -fn from_gwei(gwei: u64) -> U256 { - U256::from(gwei) * U256::exp10(9) -} - -#[cfg(test)] -mod tests { - use super::*; - use tempdir::TempDir; - use types::{EthSpec, MinimalEthSpec}; - - type E = MinimalEthSpec; - - #[test] - fn random_keypairs_round_trip() { - let spec = E::default_spec(); - let temp_dir = TempDir::new("acc_manager").expect("should create test dir"); - - let created_dir = ValidatorDirectoryBuilder::default() - .spec(spec) - .full_deposit_amount() - .expect("should set full deposit amount") - .thread_random_keypairs() - .create_directory(temp_dir.path().into()) - .expect("should create directory") - .write_keypair_files() - .expect("should write keypair files") - .write_eth1_data_file() - .expect("should write eth1 data file") - .build() - .expect("should build dir"); - - let loaded_dir = ValidatorDirectory::load_for_signing(created_dir.directory.clone()) - .expect("should load directory"); - - assert_eq!( - created_dir, loaded_dir, - "the directory created should match the one loaded" - ); - } - - #[test] - fn deterministic_keypairs_round_trip() { - let spec = E::default_spec(); - let temp_dir = TempDir::new("acc_manager").expect("should create test dir"); - let index = 42; - - let created_dir = ValidatorDirectoryBuilder::default() - .spec(spec) - .full_deposit_amount() - .expect("should set full deposit amount") - .insecure_keypairs(index) - .create_directory(temp_dir.path().into()) - .expect("should create directory") - .write_keypair_files() - .expect("should write keypair files") - .write_eth1_data_file() - .expect("should write eth1 data file") - .build() - .expect("should build dir"); - - assert!( - created_dir.directory.exists(), - "should have created directory" - ); - - let mut parent = created_dir.directory.clone(); - parent.pop(); - assert_eq!( - parent, - PathBuf::from(temp_dir.path()), - "should have created directory ontop of base dir" - ); - - let expected_keypair = generate_deterministic_keypair(index); - assert_eq!( - created_dir.voting_keypair, - Some(expected_keypair.clone()), - "voting keypair should be as expected" - ); - assert_eq!( - created_dir.withdrawal_keypair, - Some(expected_keypair), - "withdrawal keypair should be as expected" - ); - assert!( - !created_dir - .deposit_data - .clone() - .expect("should have data") - .is_empty(), - "should have some deposit data" - ); - - let loaded_dir = ValidatorDirectory::load_for_signing(created_dir.directory.clone()) - .expect("should load directory"); - - assert_eq!( - created_dir, loaded_dir, - "the directory created should match the one loaded" - ); - } -}