From 41e7a07c51be12c0a73d325296b138d771e5273a Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 1 Apr 2022 00:58:59 +0000 Subject: [PATCH] Add `lighthouse db` command (#3129) ## Proposed Changes Add a `lighthouse db` command with three initial subcommands: - `lighthouse db version`: print the database schema version. - `lighthouse db migrate --to N`: manually upgrade (or downgrade!) the database to a different version. - `lighthouse db inspect --column C`: log the key and size in bytes of every value in a given `DBColumn`. This PR lays the groundwork for other changes, namely: - Mark's fast-deposit sync (https://github.com/sigp/lighthouse/pull/2915), for which I think we should implement a database downgrade (from v9 to v8). - My `tree-states` work, which already implements a downgrade (v10 to v8). - Standalone purge commands like `lighthouse db purge-dht` per https://github.com/sigp/lighthouse/issues/2824. ## Additional Info I updated the `strum` crate to 0.24.0, which necessitated some changes in the network code to remove calls to deprecated methods. Thanks to @winksaville for the motivation, and implementation work that I used as a source of inspiration (https://github.com/sigp/lighthouse/pull/2685). --- Cargo.lock | 31 +- Cargo.toml | 4 +- beacon_node/beacon_chain/Cargo.toml | 2 +- beacon_node/client/src/config.rs | 2 +- beacon_node/execution_layer/Cargo.toml | 2 +- beacon_node/lighthouse_network/Cargo.toml | 2 +- .../src/peer_manager/mod.rs | 11 +- .../src/peer_manager/peerdb/client.rs | 4 +- .../lighthouse_network/src/rpc/methods.rs | 4 +- .../lighthouse_network/src/rpc/protocol.rs | 8 +- beacon_node/network/Cargo.toml | 2 +- beacon_node/network/src/metrics.rs | 9 +- .../network/src/sync/block_lookups/mod.rs | 14 +- .../src/sync/block_lookups/parent_lookup.rs | 4 +- .../sync/block_lookups/single_block_lookup.rs | 6 +- beacon_node/src/config.rs | 25 +- beacon_node/src/lib.rs | 2 +- beacon_node/store/Cargo.toml | 1 + beacon_node/store/src/hot_cold_store.rs | 3 +- beacon_node/store/src/leveldb_store.rs | 25 +- beacon_node/store/src/lib.rs | 68 ++--- database_manager/Cargo.toml | 18 ++ database_manager/src/lib.rs | 278 ++++++++++++++++++ lighthouse/Cargo.toml | 1 + lighthouse/src/main.rs | 12 +- 25 files changed, 449 insertions(+), 89 deletions(-) create mode 100644 database_manager/Cargo.toml create mode 100644 database_manager/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 38725b126..9c4710f95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1114,6 +1114,24 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" +[[package]] +name = "database_manager" +version = "0.1.0" +dependencies = [ + "beacon_chain", + "beacon_node", + "clap", + "clap_utils", + "environment", + "logging", + "slog", + "sloggers", + "store", + "strum", + "tempfile", + "types", +] + [[package]] name = "db-key" version = "0.0.5" @@ -3368,6 +3386,7 @@ dependencies = [ "boot_node", "clap", "clap_utils", + "database_manager", "directory", "env_logger 0.9.0", "environment", @@ -5885,6 +5904,7 @@ dependencies = [ "slog", "sloggers", "state_processing", + "strum", "tempfile", "types", ] @@ -5903,22 +5923,23 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "strum" -version = "0.21.0" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf86bbcfd1fa9670b7a129f64fc0c9fcbbfe4f1bc4210e9e98fe71ffc12cde2" +checksum = "e96acfc1b70604b8b2f1ffa4c57e59176c7dbb05d556c71ecd2f5498a1dee7f8" dependencies = [ "strum_macros", ] [[package]] name = "strum_macros" -version = "0.21.1" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d06aaeeee809dbc59eb4556183dd927df67db1540de5be8d3ec0b6636358a5ec" +checksum = "6878079b17446e4d3eba6192bb0a2950d5b14f0ed8424b852310e5a94345d0ef" dependencies = [ - "heck 0.3.3", + "heck 0.4.0", "proc-macro2", "quote", + "rustversion", "syn", ] diff --git a/Cargo.toml b/Cargo.toml index 4b1132ba6..dc07a7cfe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ members = [ "beacon_node/store", "beacon_node/timer", - "boot_node", + "boot_node", "common/account_utils", "common/clap_utils", @@ -45,6 +45,8 @@ members = [ "common/fallback", "common/monitoring_api", + "database_manager", + "consensus/cached_tree_hash", "consensus/int_to_bytes", "consensus/fork_choice", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 3347d5dec..552adffac 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -55,7 +55,7 @@ derivative = "2.1.1" itertools = "0.10.0" slasher = { path = "../../slasher" } eth2 = { path = "../../common/eth2" } -strum = { version = "0.21.0", features = ["derive"] } +strum = { version = "0.24.0", features = ["derive"] } logging = { path = "../../common/logging" } execution_layer = { path = "../execution_layer" } sensitive_url = { path = "../../common/sensitive_url" } diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 15259204a..bb9e196f7 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -169,7 +169,7 @@ impl Config { /// For more information, see: /// /// https://github.com/sigp/lighthouse/pull/2843 - fn get_data_dir(&self) -> PathBuf { + pub fn get_data_dir(&self) -> PathBuf { let existing_legacy_dir = self.get_existing_legacy_data_dir(); if let Some(legacy_dir) = existing_legacy_dir { diff --git a/beacon_node/execution_layer/Cargo.toml b/beacon_node/execution_layer/Cargo.toml index b12d30ea2..d81f83d69 100644 --- a/beacon_node/execution_layer/Cargo.toml +++ b/beacon_node/execution_layer/Cargo.toml @@ -34,4 +34,4 @@ tempfile = "3.1.0" rand = "0.7.3" zeroize = { version = "1.4.2", features = ["zeroize_derive"] } lighthouse_metrics = { path = "../../common/lighthouse_metrics" } -lazy_static = "1.4.0" \ No newline at end of file +lazy_static = "1.4.0" diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index d1407f981..c2d5d859d 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -36,7 +36,7 @@ task_executor = { path = "../../common/task_executor" } rand = "0.7.3" directory = { path = "../../common/directory" } regex = "1.5.5" -strum = { version = "0.21.0", features = ["derive"] } +strum = { version = "0.24.0", features = ["derive"] } superstruct = "0.4.1" prometheus-client = "0.15.0" unused_port = { path = "../../common/unused_port" } diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 437d05d47..cf31cee02 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -388,7 +388,7 @@ impl PeerManager { /// Updates `PeerInfo` with `identify` information. pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - let previous_kind = peer_info.client().kind.clone(); + let previous_kind = peer_info.client().kind; let previous_listening_addresses = peer_info.set_listening_addresses(info.listen_addrs.clone()); peer_info.set_client(peerdb::client::Client::from_identify_info(info)); @@ -412,12 +412,9 @@ impl PeerManager { ) { metrics::inc_gauge_vec( &metrics::PEERS_PER_CLIENT, - &[&peer_info.client().kind.to_string()], - ); - metrics::dec_gauge_vec( - &metrics::PEERS_PER_CLIENT, - &[&previous_kind.to_string()], + &[peer_info.client().kind.as_ref()], ); + metrics::dec_gauge_vec(&metrics::PEERS_PER_CLIENT, &[previous_kind.as_ref()]); } } } else { @@ -674,7 +671,7 @@ impl PeerManager { let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0); metrics::set_gauge_vec( &metrics::PEERS_PER_CLIENT, - &[&client_kind.to_string()], + &[client_kind.as_ref()], *value as i64, ); } diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/client.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/client.rs index 7cc84516a..f15f38daa 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/client.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/client.rs @@ -4,7 +4,7 @@ use libp2p::identify::IdentifyInfo; use serde::Serialize; -use strum::{AsRefStr, AsStaticStr, EnumIter}; +use strum::{AsRefStr, EnumIter, IntoStaticStr}; /// Various client and protocol information related to a node. #[derive(Clone, Debug, Serialize)] @@ -21,7 +21,7 @@ pub struct Client { pub agent_string: Option, } -#[derive(Clone, Debug, Serialize, PartialEq, AsRefStr, AsStaticStr, EnumIter)] +#[derive(Clone, Copy, Debug, Serialize, PartialEq, AsRefStr, IntoStaticStr, EnumIter)] pub enum ClientKind { /// A lighthouse node (the best kind). Lighthouse, diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 087f8e533..f38cde363 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -9,7 +9,7 @@ use ssz_types::{ VariableList, }; use std::ops::Deref; -use strum::AsStaticStr; +use strum::IntoStaticStr; use superstruct::superstruct; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; @@ -263,7 +263,7 @@ pub enum RPCCodedResponse { } /// The code assigned to an erroneous `RPCResponse`. -#[derive(Debug, Clone, Copy, PartialEq, AsStaticStr)] +#[derive(Debug, Clone, Copy, PartialEq, IntoStaticStr)] #[strum(serialize_all = "snake_case")] pub enum RPCResponseErrorCode { RateLimited, diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 1e6504199..e3ad6a803 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -14,7 +14,7 @@ use std::io; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; -use strum::{AsStaticRef, AsStaticStr}; +use strum::IntoStaticStr; use tokio_io_timeout::TimeoutStream; use tokio_util::{ codec::Framed, @@ -510,7 +510,7 @@ impl InboundRequest { } /// Error in RPC Encoding/Decoding. -#[derive(Debug, Clone, PartialEq, AsStaticStr)] +#[derive(Debug, Clone, PartialEq, IntoStaticStr)] #[strum(serialize_all = "snake_case")] pub enum RPCError { /// Error when decoding the raw buffer from ssz. @@ -617,8 +617,8 @@ impl RPCError { /// Used for metrics. pub fn as_static_str(&self) -> &'static str { match self { - RPCError::ErrorResponse(ref code, ..) => code.as_static(), - e => e.as_static(), + RPCError::ErrorResponse(ref code, ..) => code.into(), + e => e.into(), } } } diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 96458da0a..66f70eb55 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -41,5 +41,5 @@ itertools = "0.10.0" num_cpus = "1.13.0" lru_cache = { path = "../../common/lru_cache" } if-addrs = "0.6.4" -strum = "0.21.0" +strum = "0.24.0" tokio-util = { version = "0.6.3", features = ["time"] } diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 04aa51472..02c491cb0 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -9,7 +9,6 @@ use lighthouse_network::{ Gossipsub, NetworkGlobals, }; use std::sync::Arc; -use strum::AsStaticRef; use strum::IntoEnumIterator; use types::EthSpec; @@ -357,12 +356,12 @@ pub fn update_gossip_metrics( for client_kind in ClientKind::iter() { set_gauge_vec( &BEACON_BLOCK_MESH_PEERS_PER_CLIENT, - &[&client_kind.to_string()], + &[client_kind.as_ref()], 0_i64, ); set_gauge_vec( &BEACON_AGGREGATE_AND_PROOF_MESH_PEERS_PER_CLIENT, - &[&client_kind.to_string()], + &[client_kind.as_ref()], 0_i64, ); } @@ -377,7 +376,7 @@ pub fn update_gossip_metrics( .peers .read() .peer_info(peer_id) - .map(|peer_info| peer_info.client().kind.as_static()) + .map(|peer_info| peer_info.client().kind.into()) .unwrap_or_else(|| "Unknown"); if let Some(v) = get_int_gauge(&BEACON_BLOCK_MESH_PEERS_PER_CLIENT, &[client]) @@ -392,7 +391,7 @@ pub fn update_gossip_metrics( .peers .read() .peer_info(peer_id) - .map(|peer_info| peer_info.client().kind.as_static()) + .map(|peer_info| peer_info.client().kind.into()) .unwrap_or_else(|| "Unknown"); if let Some(v) = get_int_gauge( &BEACON_AGGREGATE_AND_PROOF_MESH_PEERS_PER_CLIENT, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index b11dc1c7a..c4c100699 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -8,7 +8,6 @@ use lru_cache::LRUCache; use slog::{crit, debug, error, trace, warn, Logger}; use smallvec::SmallVec; use store::{Hash256, SignedBeaconBlock}; -use strum::AsStaticRef; use tokio::sync::mpsc; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; @@ -176,7 +175,7 @@ impl BlockLookups { // request finished correctly, it will be removed after the block is processed. } Err(error) => { - let msg: &str = error.as_static(); + let msg: &str = error.into(); cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); // Remove the request, if it can be retried it will be added with a new id. let mut req = request.remove(); @@ -243,7 +242,7 @@ impl BlockLookups { VerifyError::RootMismatch | VerifyError::NoBlockReturned | VerifyError::ExtraBlocksReturned => { - let e = e.as_static(); + let e = e.into(); warn!(self.log, "Peer sent invalid response to parent request."; "peer_id" => %peer_id, "reason" => e); @@ -310,8 +309,13 @@ impl BlockLookups { } } Err(e) => { - trace!(self.log, "Single block request failed on peer disconnection"; - "block_root" => %req.hash, "peer_id" => %peer_id, "reason" => e.as_static()); + trace!( + self.log, + "Single block request failed on peer disconnection"; + "block_root" => %req.hash, + "peer_id" => %peer_id, + "reason" => <&str>::from(e), + ); } } } diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index eb8d61ab9..777c3e930 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,6 +1,6 @@ use lighthouse_network::PeerId; use store::{EthSpec, Hash256, SignedBeaconBlock}; -use strum::AsStaticStr; +use strum::IntoStaticStr; use crate::sync::{ manager::{Id, SLOT_IMPORT_TOLERANCE}, @@ -28,7 +28,7 @@ pub(crate) struct ParentLookup { current_parent_request_id: Option, } -#[derive(Debug, PartialEq, Eq, AsStaticStr)] +#[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum VerifyError { RootMismatch, NoBlockReturned, diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index a4df616cb..347a4ae43 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -4,7 +4,7 @@ use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; use rand::seq::IteratorRandom; use ssz_types::VariableList; use store::{EthSpec, Hash256, SignedBeaconBlock}; -use strum::AsStaticStr; +use strum::IntoStaticStr; /// Object representing a single block lookup request. #[derive(PartialEq, Eq)] @@ -28,14 +28,14 @@ pub enum State { Processing { peer_id: PeerId }, } -#[derive(Debug, PartialEq, Eq, AsStaticStr)] +#[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum VerifyError { RootMismatch, NoBlockReturned, ExtraBlocksReturned, } -#[derive(Debug, PartialEq, Eq, AsStaticStr)] +#[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupRequestError { TooManyAttempts, NoPeers, diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 4ccf22e65..7f45ad355 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -284,16 +284,7 @@ pub fn get_config( client_config.freezer_db_path = Some(PathBuf::from(freezer_dir)); } - if let Some(slots_per_restore_point) = cli_args.value_of("slots-per-restore-point") { - client_config.store.slots_per_restore_point = slots_per_restore_point - .parse() - .map_err(|_| "slots-per-restore-point is not a valid integer".to_string())?; - } else { - client_config.store.slots_per_restore_point = std::cmp::min( - E::slots_per_historical_root() as u64, - store::config::DEFAULT_SLOTS_PER_RESTORE_POINT, - ); - } + client_config.store.slots_per_restore_point = get_slots_per_restore_point::(cli_args)?; if let Some(block_cache_size) = cli_args.value_of("block-cache-size") { client_config.store.block_cache_size = block_cache_size @@ -820,3 +811,17 @@ pub fn get_data_dir(cli_args: &ArgMatches) -> PathBuf { }) .unwrap_or_else(|| PathBuf::from(".")) } + +/// Get the `slots_per_restore_point` value to use for the database. +pub fn get_slots_per_restore_point(cli_args: &ArgMatches) -> Result { + if let Some(slots_per_restore_point) = + clap_utils::parse_optional(cli_args, "slots-per-restore-point")? + { + Ok(slots_per_restore_point) + } else { + Ok(std::cmp::min( + E::slots_per_historical_root() as u64, + store::config::DEFAULT_SLOTS_PER_RESTORE_POINT, + )) + } +} diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 773a0d2eb..690271022 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -13,7 +13,7 @@ use beacon_chain::{ use clap::ArgMatches; pub use cli::cli_app; pub use client::{Client, ClientBuilder, ClientConfig, ClientGenesis}; -pub use config::{get_config, get_data_dir, set_network_config}; +pub use config::{get_config, get_data_dir, get_slots_per_restore_point, set_network_config}; use environment::RuntimeContext; pub use eth2_config::Eth2Config; use slasher::Slasher; diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index be98f269f..679a2f95b 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -25,3 +25,4 @@ lighthouse_metrics = { path = "../../common/lighthouse_metrics" } lru = "0.7.1" sloggers = { version = "2.1.1", features = ["json"] } directory = { path = "../../common/directory" } +strum = { version = "0.24.0", features = ["derive"] } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index c413309c9..153226f9a 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -191,9 +191,8 @@ impl HotColdDB, LevelDB> { info!( db.log, "Hot-Cold DB initialized"; - "version" => CURRENT_SCHEMA_VERSION.as_u64(), "split_slot" => split.slot, - "split_state" => format!("{:?}", split.state_root) + "split_state" => ?split.state_root ); } diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 5727351c9..4a47353cb 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -1,4 +1,5 @@ use super::*; +use crate::hot_cold_store::HotColdDBError; use crate::metrics; use db_key::Key; use leveldb::compaction::Compaction; @@ -6,7 +7,7 @@ use leveldb::database::batch::{Batch, Writebatch}; use leveldb::database::kv::KV; use leveldb::database::Database; use leveldb::error::Error as LevelDBError; -use leveldb::iterator::{Iterable, KeyIterator}; +use leveldb::iterator::{Iterable, KeyIterator, LevelDBIterator}; use leveldb::options::{Options, ReadOptions, WriteOptions}; use parking_lot::{Mutex, MutexGuard}; use std::marker::PhantomData; @@ -174,6 +175,28 @@ impl KeyValueStore for LevelDB { } Ok(()) } + + /// Iterate through all keys and values in a particular column. + fn iter_column(&self, column: DBColumn) -> ColumnIter { + let start_key = + BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes())); + + let iter = self.db.iter(self.read_options()); + iter.seek(&start_key); + + Box::new( + iter.take_while(move |(key, _)| key.matches_column(column)) + .map(move |(bytes_key, value)| { + let key = + bytes_key + .remove_column(column) + .ok_or(HotColdDBError::IterationError { + unexpected_key: bytes_key, + })?; + Ok((key, value)) + }), + ) + } } impl ItemStore for LevelDB {} diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 8d1993f46..bc8f62dd2 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -39,8 +39,11 @@ pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use metadata::AnchorInfo; pub use metrics::scrape_for_metrics; use parking_lot::MutexGuard; +use strum::{EnumString, IntoStaticStr}; pub use types::*; +pub type ColumnIter<'a> = Box), Error>> + 'a>; + pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Retrieve some bytes in `column` with `key`. fn get_bytes(&self, column: &str, key: &[u8]) -> Result>, Error>; @@ -73,6 +76,12 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Compact the database, freeing space used by deleted items. fn compact(&self) -> Result<(), Error>; + + /// Iterate through all values in a particular column. + fn iter_column(&self, _column: DBColumn) -> ColumnIter { + // Default impl for non LevelDB databases + Box::new(std::iter::empty()) + } } pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { @@ -146,56 +155,49 @@ pub enum StoreOp<'a, E: EthSpec> { } /// A unique column identifier. -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, IntoStaticStr, EnumString)] pub enum DBColumn { /// For data related to the database itself. + #[strum(serialize = "bma")] BeaconMeta, + #[strum(serialize = "blk")] BeaconBlock, + /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). + #[strum(serialize = "ste")] BeaconState, - /// For persisting in-memory state to the database. - BeaconChain, - OpPool, - Eth1Cache, - ForkChoice, - PubkeyCache, - /// For the table mapping restore point numbers to state roots. - BeaconRestorePoint, /// For the mapping from state roots to their slots or summaries. + #[strum(serialize = "bss")] BeaconStateSummary, /// For the list of temporary states stored during block import, /// and then made non-temporary by the deletion of their state root from this column. + #[strum(serialize = "bst")] BeaconStateTemporary, + /// For persisting in-memory state to the database. + #[strum(serialize = "bch")] + BeaconChain, + #[strum(serialize = "opo")] + OpPool, + #[strum(serialize = "etc")] + Eth1Cache, + #[strum(serialize = "frk")] + ForkChoice, + #[strum(serialize = "pkc")] + PubkeyCache, + /// For the table mapping restore point numbers to state roots. + #[strum(serialize = "brp")] + BeaconRestorePoint, + #[strum(serialize = "bbr")] BeaconBlockRoots, + #[strum(serialize = "bsr")] BeaconStateRoots, + #[strum(serialize = "bhr")] BeaconHistoricalRoots, + #[strum(serialize = "brm")] BeaconRandaoMixes, + #[strum(serialize = "dht")] DhtEnrs, } -impl Into<&'static str> for DBColumn { - /// Returns a `&str` prefix to be added to keys before they hit the key-value database. - fn into(self) -> &'static str { - match self { - DBColumn::BeaconMeta => "bma", - DBColumn::BeaconBlock => "blk", - DBColumn::BeaconState => "ste", - DBColumn::BeaconChain => "bch", - DBColumn::OpPool => "opo", - DBColumn::Eth1Cache => "etc", - DBColumn::ForkChoice => "frk", - DBColumn::PubkeyCache => "pkc", - DBColumn::BeaconRestorePoint => "brp", - DBColumn::BeaconStateSummary => "bss", - DBColumn::BeaconStateTemporary => "bst", - DBColumn::BeaconBlockRoots => "bbr", - DBColumn::BeaconStateRoots => "bsr", - DBColumn::BeaconHistoricalRoots => "bhr", - DBColumn::BeaconRandaoMixes => "brm", - DBColumn::DhtEnrs => "dht", - } - } -} - impl DBColumn { pub fn as_str(self) -> &'static str { self.into() diff --git a/database_manager/Cargo.toml b/database_manager/Cargo.toml new file mode 100644 index 000000000..f71552813 --- /dev/null +++ b/database_manager/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "database_manager" +version = "0.1.0" +edition = "2021" + +[dependencies] +beacon_chain = { path = "../beacon_node/beacon_chain" } +beacon_node = { path = "../beacon_node" } +clap = "2.33.3" +clap_utils = { path = "../common/clap_utils" } +environment = { path = "../lighthouse/environment" } +logging = { path = "../common/logging" } +sloggers = "2.0.2" +store = { path = "../beacon_node/store" } +tempfile = "3.1.0" +types = { path = "../consensus/types" } +slog = "2.5.2" +strum = { version = "0.24.0", features = ["derive"] } diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs new file mode 100644 index 000000000..eaf94d532 --- /dev/null +++ b/database_manager/src/lib.rs @@ -0,0 +1,278 @@ +use beacon_chain::{ + builder::Witness, eth1_chain::CachingEth1Backend, schema_change::migrate_schema, + slot_clock::SystemTimeSlotClock, +}; +use beacon_node::{get_data_dir, get_slots_per_restore_point, ClientConfig}; +use clap::{App, Arg, ArgMatches}; +use environment::{Environment, RuntimeContext}; +use slog::{info, Logger}; +use store::{ + errors::Error, + metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}, + DBColumn, HotColdDB, KeyValueStore, LevelDB, +}; +use strum::{EnumString, EnumVariantNames, VariantNames}; +use types::EthSpec; + +pub const CMD: &str = "database_manager"; + +pub fn version_cli_app<'a, 'b>() -> App<'a, 'b> { + App::new("version") + .visible_aliases(&["v"]) + .setting(clap::AppSettings::ColoredHelp) + .about("Display database schema version") +} + +pub fn migrate_cli_app<'a, 'b>() -> App<'a, 'b> { + App::new("migrate") + .setting(clap::AppSettings::ColoredHelp) + .about("Migrate the database to a specific schema version") + .arg( + Arg::with_name("to") + .long("to") + .value_name("VERSION") + .help("Schema version to migrate to") + .takes_value(true) + .required(true), + ) +} + +pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { + App::new("inspect") + .setting(clap::AppSettings::ColoredHelp) + .about("Inspect raw database values") + .arg( + Arg::with_name("column") + .long("column") + .value_name("TAG") + .help("3-byte column ID (see `DBColumn`)") + .takes_value(true) + .required(true), + ) + .arg( + Arg::with_name("output") + .long("output") + .value_name("TARGET") + .help("Select the type of output to show") + .default_value("sizes") + .possible_values(InspectTarget::VARIANTS), + ) +} + +pub fn cli_app<'a, 'b>() -> App<'a, 'b> { + App::new(CMD) + .visible_aliases(&["db"]) + .setting(clap::AppSettings::ColoredHelp) + .about("Manage a beacon node database") + .arg( + Arg::with_name("slots-per-restore-point") + .long("slots-per-restore-point") + .value_name("SLOT_COUNT") + .help( + "Specifies how often a freezer DB restore point should be stored. \ + Cannot be changed after initialization. \ + [default: 2048 (mainnet) or 64 (minimal)]", + ) + .takes_value(true), + ) + .arg( + Arg::with_name("freezer-dir") + .long("freezer-dir") + .value_name("DIR") + .help("Data directory for the freezer database.") + .takes_value(true), + ) + .subcommand(migrate_cli_app()) + .subcommand(version_cli_app()) + .subcommand(inspect_cli_app()) +} + +fn parse_client_config( + cli_args: &ArgMatches, + _env: &Environment, +) -> Result { + let mut client_config = ClientConfig { + data_dir: get_data_dir(cli_args), + ..Default::default() + }; + + if let Some(freezer_dir) = clap_utils::parse_optional(cli_args, "freezer-dir")? { + client_config.freezer_db_path = Some(freezer_dir); + } + + client_config.store.slots_per_restore_point = get_slots_per_restore_point::(cli_args)?; + + Ok(client_config) +} + +pub fn display_db_version( + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = runtime_context.eth2_config.spec.clone(); + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + + let mut version = CURRENT_SCHEMA_VERSION; + HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + |_, from, _| { + version = from; + Ok(()) + }, + client_config.store, + spec, + log.clone(), + )?; + + info!(log, "Database version: {}", version.as_u64()); + + if version != CURRENT_SCHEMA_VERSION { + info!( + log, + "Latest schema version: {}", + CURRENT_SCHEMA_VERSION.as_u64(), + ); + } + + Ok(()) +} + +#[derive(Debug, EnumString, EnumVariantNames)] +pub enum InspectTarget { + #[strum(serialize = "sizes")] + ValueSizes, + #[strum(serialize = "total")] + ValueTotal, +} + +pub struct InspectConfig { + column: DBColumn, + target: InspectTarget, +} + +fn parse_inspect_config(cli_args: &ArgMatches) -> Result { + let column = clap_utils::parse_required(cli_args, "column")?; + let target = clap_utils::parse_required(cli_args, "output")?; + + Ok(InspectConfig { column, target }) +} + +pub fn inspect_db( + inspect_config: InspectConfig, + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = runtime_context.eth2_config.spec.clone(); + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + |_, _, _| Ok(()), + client_config.store, + spec, + log, + )?; + + let mut total = 0; + + for res in db.hot_db.iter_column(inspect_config.column) { + let (key, value) = res?; + + match inspect_config.target { + InspectTarget::ValueSizes => { + println!("{:?}: {} bytes", key, value.len()); + total += value.len(); + } + InspectTarget::ValueTotal => { + total += value.len(); + } + } + } + + match inspect_config.target { + InspectTarget::ValueSizes | InspectTarget::ValueTotal => { + println!("Total: {} bytes", total); + } + } + + Ok(()) +} + +pub struct MigrateConfig { + to: SchemaVersion, +} + +fn parse_migrate_config(cli_args: &ArgMatches) -> Result { + let to = SchemaVersion(clap_utils::parse_required(cli_args, "to")?); + + Ok(MigrateConfig { to }) +} + +pub fn migrate_db( + migrate_config: MigrateConfig, + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = runtime_context.eth2_config.spec.clone(); + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + + let mut from = CURRENT_SCHEMA_VERSION; + let to = migrate_config.to; + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + |_, db_initial_version, _| { + from = db_initial_version; + Ok(()) + }, + client_config.store.clone(), + spec, + log.clone(), + )?; + + info!( + log, + "Migrating database schema"; + "from" => from.as_u64(), + "to" => to.as_u64(), + ); + + migrate_schema::, _, _, _>>( + db, + &client_config.get_data_dir(), + from, + to, + log, + ) +} + +/// Run the database manager, returning an error string if the operation did not succeed. +pub fn run(cli_args: &ArgMatches<'_>, mut env: Environment) -> Result<(), String> { + let client_config = parse_client_config(cli_args, &env)?; + let context = env.core_context(); + let log = context.log().clone(); + + match cli_args.subcommand() { + ("version", Some(_)) => display_db_version(client_config, &context, log), + ("migrate", Some(cli_args)) => { + let migrate_config = parse_migrate_config(cli_args)?; + migrate_db(migrate_config, client_config, &context, log) + } + ("inspect", Some(cli_args)) => { + let inspect_config = parse_inspect_config(cli_args)?; + inspect_db(inspect_config, client_config, &context, log) + } + _ => { + return Err("Unknown subcommand, for help `lighthouse database_manager --help`".into()) + } + } + .map_err(|e| format!("Fatal error: {:?}", e)) +} diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 18cc8324c..fa1289eba 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -47,6 +47,7 @@ task_executor = { path = "../common/task_executor" } malloc_utils = { path = "../common/malloc_utils" } directory = { path = "../common/directory" } unused_port = { path = "../common/unused_port" } +database_manager = { path = "../database_manager" } [dev-dependencies] tempfile = "3.1.0" diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index b60f3404c..be8708376 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -275,6 +275,7 @@ fn main() { .subcommand(boot_node::cli_app()) .subcommand(validator_client::cli_app()) .subcommand(account_manager::cli_app()) + .subcommand(database_manager::cli_app()) .get_matches(); // Configure the allocator early in the process, before it has the chance to use the default values for @@ -485,7 +486,16 @@ fn run( // Exit as soon as account manager returns control. return Ok(()); - }; + } + + if let Some(sub_matches) = matches.subcommand_matches(database_manager::CMD) { + info!(log, "Running database manager for {} network", network_name); + // Pass the entire `environment` to the database manager so it can run blocking operations. + database_manager::run(sub_matches, environment)?; + + // Exit as soon as database manager returns control. + return Ok(()); + } info!(log, "Lighthouse started"; "version" => VERSION); info!(