From 8c5bcfe53a34bc624f79e11b5038c691d18a9477 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 6 Mar 2020 16:09:41 +1100 Subject: [PATCH] Optimise beacon chain persistence (#851) * Unfinished progress * Update more persistence code * Start fixing tests * Combine persist head and fork choice * Persist head on reorg * Gracefully handle op pool and eth1 cache missing * Fix test failure * Address Michael's comments --- Cargo.lock | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 124 +++++++++++----- beacon_node/beacon_chain/src/builder.rs | 132 ++++++++++++------ beacon_node/beacon_chain/src/eth1_chain.rs | 22 ++- beacon_node/beacon_chain/src/fork_choice.rs | 17 ++- beacon_node/beacon_chain/src/metrics.rs | 12 +- .../src/persisted_beacon_chain.rs | 17 +-- beacon_node/beacon_chain/src/test_utils.rs | 8 +- .../beacon_chain/tests/persistence_tests.rs | 13 +- beacon_node/beacon_chain/tests/tests.rs | 24 ++-- beacon_node/client/src/builder.rs | 22 ++- beacon_node/store/src/lib.rs | 7 + eth2/operation_pool/Cargo.toml | 1 + eth2/operation_pool/src/persistence.rs | 16 +++ 14 files changed, 297 insertions(+), 119 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 536f88624..ce0a7e69a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2798,6 +2798,7 @@ dependencies = [ "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "state_processing 0.1.0", + "store 0.1.0", "types 0.1.0", ] diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index fc762adbe..1109dd2ab 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -5,7 +5,7 @@ use crate::events::{EventHandler, EventKind}; use crate::fork_choice::{Error as ForkChoiceError, ForkChoice}; use crate::head_tracker::HeadTracker; use crate::metrics; -use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; +use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::ShufflingCache; use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_pubkey_cache::ValidatorPubkeyCache; @@ -62,6 +62,11 @@ const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1); /// validator pubkey cache. const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1); +pub const BEACON_CHAIN_DB_KEY: [u8; 32] = [0; 32]; +pub const OP_POOL_DB_KEY: [u8; 32] = [0; 32]; +pub const ETH1_CACHE_DB_KEY: [u8; 32] = [0; 32]; +pub const FORK_CHOICE_DB_KEY: [u8; 32] = [0; 32]; + #[derive(Debug, PartialEq)] pub enum BlockProcessingOutcome { /// Block was valid and imported into the block graph. @@ -196,43 +201,76 @@ pub struct BeaconChain { type BeaconBlockAndState = (BeaconBlock, BeaconState); impl BeaconChain { - /// Attempt to save this instance to `self.store`. - pub fn persist(&self) -> Result<(), Error> { - let timer = metrics::start_timer(&metrics::PERSIST_CHAIN); + /// Persists the core `BeaconChain` components (including the head block) and the fork choice. + /// + /// ## Notes: + /// + /// In this function we first obtain the head, persist fork choice, then persist the head. We + /// do it in this order to ensure that the persisted head is always from a time prior to fork + /// choice. + /// + /// We want to ensure that the head never out dates the fork choice to avoid having references + /// to blocks that do not exist in fork choice. + pub fn persist_head_and_fork_choice(&self) -> Result<(), Error> { + let canonical_head_block_root = self + .canonical_head + .try_read_for(HEAD_LOCK_TIMEOUT) + .ok_or_else(|| Error::CanonicalHeadLockTimeout)? + .beacon_block_root; - let canonical_head = self.head()?; - - let finalized_checkpoint = { - let beacon_block_root = canonical_head.beacon_state.finalized_checkpoint.root; - let beacon_block = self - .store - .get_block(&beacon_block_root)? - .ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?; - let beacon_state_root = beacon_block.state_root(); - let beacon_state = self - .get_state(&beacon_state_root, Some(beacon_block.slot()))? - .ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?; - - CheckPoint { - beacon_block_root, - beacon_block, - beacon_state_root, - beacon_state, - } - }; - - let p: PersistedBeaconChain = PersistedBeaconChain { - canonical_head, - finalized_checkpoint, - op_pool: PersistedOperationPool::from_operation_pool(&self.op_pool), + let persisted_head = PersistedBeaconChain { + canonical_head_block_root, genesis_block_root: self.genesis_block_root, ssz_head_tracker: self.head_tracker.to_ssz_container(), - fork_choice: self.fork_choice.as_ssz_container(), - eth1_cache: self.eth1_chain.as_ref().map(|x| x.as_ssz_container()), }; - let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes()); - self.store.put(&key, &p)?; + let fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE); + + self.store.put( + &Hash256::from_slice(&FORK_CHOICE_DB_KEY), + &self.fork_choice.as_ssz_container(), + )?; + + metrics::stop_timer(fork_choice_timer); + let head_timer = metrics::start_timer(&metrics::PERSIST_HEAD); + + self.store + .put(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY), &persisted_head)?; + + metrics::stop_timer(head_timer); + + Ok(()) + } + + /// Persists `self.op_pool` to disk. + /// + /// ## Notes + /// + /// This operation is typically slow and causes a lot of allocations. It should be used + /// sparingly. + pub fn persist_op_pool(&self) -> Result<(), Error> { + let timer = metrics::start_timer(&metrics::PERSIST_OP_POOL); + + self.store.put( + &Hash256::from_slice(&OP_POOL_DB_KEY), + &PersistedOperationPool::from_operation_pool(&self.op_pool), + )?; + + metrics::stop_timer(timer); + + Ok(()) + } + + /// Persists `self.eth1_chain` and its caches to disk. + pub fn persist_eth1_cache(&self) -> Result<(), Error> { + let timer = metrics::start_timer(&metrics::PERSIST_OP_POOL); + + if let Some(eth1_chain) = self.eth1_chain.as_ref() { + self.store.put( + &Hash256::from_slice(Ð1_CACHE_DB_KEY), + ð1_chain.as_ssz_container(), + )?; + } metrics::stop_timer(timer); @@ -1654,8 +1692,12 @@ impl BeaconChain { metrics::stop_timer(timer); - // Save `self` to `self.store`. - self.persist()?; + if previous_slot.epoch(T::EthSpec::slots_per_epoch()) + < new_slot.epoch(T::EthSpec::slots_per_epoch()) + || is_reorg + { + self.persist_head_and_fork_choice()?; + } let _ = self.event_handler.register(EventKind::BeaconHeadChanged { reorg: is_reorg, @@ -1793,16 +1835,22 @@ impl BeaconChain { impl Drop for BeaconChain { fn drop(&mut self) { - if let Err(e) = self.persist() { + let drop = || -> Result<(), Error> { + self.persist_head_and_fork_choice()?; + self.persist_op_pool()?; + self.persist_eth1_cache() + }; + + if let Err(e) = drop() { error!( self.log, - "Failed to persist BeaconChain on drop"; + "Failed to persist on BeaconChain drop"; "error" => format!("{:?}", e) ) } else { info!( self.log, - "Saved beacon chain state"; + "Saved beacon chain to disk"; ) } } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 85527a9f0..9067965f7 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,7 +1,11 @@ -use crate::eth1_chain::CachingEth1Backend; +use crate::beacon_chain::{ + BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY, +}; +use crate::eth1_chain::{CachingEth1Backend, SszEth1}; use crate::events::NullEventHandler; +use crate::fork_choice::SszForkChoice; use crate::head_tracker::HeadTracker; -use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; +use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::ShufflingCache; use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_pubkey_cache::ValidatorPubkeyCache; @@ -10,7 +14,7 @@ use crate::{ ForkChoice, }; use eth1::Config as Eth1Config; -use operation_pool::OperationPool; +use operation_pool::{OperationPool, PersistedOperationPool}; use proto_array_fork_choice::ProtoArrayForkChoice; use slog::{info, Logger}; use slot_clock::{SlotClock, TestingSlotClock}; @@ -67,6 +71,7 @@ where pub struct BeaconChainBuilder { store: Option>, store_migrator: Option, + canonical_head: Option>, /// The finalized checkpoint to anchor the chain. May be genesis or a higher /// checkpoint. pub finalized_checkpoint: Option>, @@ -76,7 +81,6 @@ pub struct BeaconChainBuilder { eth1_chain: Option>, event_handler: Option, slot_clock: Option, - persisted_beacon_chain: Option>, head_tracker: Option, data_dir: Option, pubkey_cache_path: Option, @@ -105,6 +109,7 @@ where Self { store: None, store_migrator: None, + canonical_head: None, finalized_checkpoint: None, genesis_block_root: None, op_pool: None, @@ -112,7 +117,6 @@ where eth1_chain: None, event_handler: None, slot_clock: None, - persisted_beacon_chain: None, head_tracker: None, pubkey_cache_path: None, data_dir: None, @@ -162,10 +166,22 @@ where self } + /// Attempt to load an existing eth1 cache from the builder's `Store`. + pub fn get_persisted_eth1_backend(&self) -> Result, String> { + let store = self + .store + .clone() + .ok_or_else(|| "get_persisted_eth1_backend requires a store.".to_string())?; + + store + .get::(&Hash256::from_slice(Ð1_CACHE_DB_KEY)) + .map_err(|e| format!("DB error whilst reading eth1 cache: {:?}", e)) + } + /// Attempt to load an existing chain from the builder's `Store`. /// /// May initialize several components; including the op_pool and finalized checkpoints. - pub fn resume_from_db(mut self, config: Eth1Config) -> Result { + pub fn resume_from_db(mut self) -> Result { let log = self .log .as_ref() @@ -187,37 +203,63 @@ where .clone() .ok_or_else(|| "load_from_store requires a store.".to_string())?; - let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes()); - let p: PersistedBeaconChain< - Witness, - > = match store.get(&key) { - Err(e) => { - return Err(format!( - "DB error when reading persisted beacon chain: {:?}", - e - )) - } - Ok(None) => return Err("No persisted beacon chain found in store".into()), - Ok(Some(p)) => p, - }; + let chain = store + .get::(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY)) + .map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))? + .ok_or_else(|| { + "No persisted beacon chain found in store. Try deleting the .lighthouse/beacon dir." + .to_string() + })?; - self.op_pool = Some( - p.op_pool - .clone() - .into_operation_pool(&p.canonical_head.beacon_state, &self.spec), - ); - - self.finalized_checkpoint = Some(p.finalized_checkpoint.clone()); - self.genesis_block_root = Some(p.genesis_block_root); + self.genesis_block_root = Some(chain.genesis_block_root); self.head_tracker = Some( - HeadTracker::from_ssz_container(&p.ssz_head_tracker) + HeadTracker::from_ssz_container(&chain.ssz_head_tracker) .map_err(|e| format!("Failed to decode head tracker for database: {:?}", e))?, ); - self.eth1_chain = match &p.eth1_cache { - Some(cache) => Some(Eth1Chain::from_ssz_container(cache, config, store, log)?), - None => None, - }; - self.persisted_beacon_chain = Some(p); + + let head_block_root = chain.canonical_head_block_root; + let head_block = store + .get::>(&head_block_root) + .map_err(|e| format!("DB error when reading head block: {:?}", e))? + .ok_or_else(|| "Head block not found in store".to_string())?; + let head_state_root = head_block.state_root(); + let head_state = store + .get_state(&head_state_root, Some(head_block.slot())) + .map_err(|e| format!("DB error when reading head state: {:?}", e))? + .ok_or_else(|| "Head state not found in store".to_string())?; + + self.op_pool = Some( + store + .get::>(&Hash256::from_slice(&OP_POOL_DB_KEY)) + .map_err(|e| format!("DB error whilst reading persisted op pool: {:?}", e))? + .map(|persisted| persisted.into_operation_pool(&head_state, &self.spec)) + .unwrap_or_else(|| OperationPool::new()), + ); + + let finalized_block_root = head_state.finalized_checkpoint.root; + let finalized_block = store + .get::>(&finalized_block_root) + .map_err(|e| format!("DB error when reading finalized block: {:?}", e))? + .ok_or_else(|| "Finalized block not found in store".to_string())?; + let finalized_state_root = finalized_block.state_root(); + let finalized_state = store + .get_state(&finalized_state_root, Some(finalized_block.slot())) + .map_err(|e| format!("DB error when reading finalized state: {:?}", e))? + .ok_or_else(|| "Finalized state not found in store".to_string())?; + + self.finalized_checkpoint = Some(CheckPoint { + beacon_block_root: finalized_block_root, + beacon_block: finalized_block, + beacon_state_root: finalized_state_root, + beacon_state: finalized_state, + }); + + self.canonical_head = Some(CheckPoint { + beacon_block_root: head_block_root, + beacon_block: head_block, + beacon_state_root: head_state_root, + beacon_state: head_state, + }); let pubkey_cache = ValidatorPubkeyCache::load_from_file(pubkey_cache_path) .map_err(|e| format!("Unable to open persisted pubkey cache: {:?}", e))?; @@ -322,8 +364,8 @@ where // If this beacon chain is being loaded from disk, use the stored head. Otherwise, just use // the finalized checkpoint (which is probably genesis). - let mut canonical_head = if let Some(persisted_beacon_chain) = self.persisted_beacon_chain { - persisted_beacon_chain.canonical_head + let mut canonical_head = if let Some(head) = self.canonical_head { + head } else { self.finalized_checkpoint .ok_or_else(|| "Cannot build without a state".to_string())? @@ -414,9 +456,18 @@ where /// If this builder is being "resumed" from disk, then rebuild the last fork choice stored to /// the database. Otherwise, create a new, empty fork choice. pub fn reduced_tree_fork_choice(mut self) -> Result { - let fork_choice = if let Some(persisted_beacon_chain) = &self.persisted_beacon_chain { - ForkChoice::from_ssz_container(persisted_beacon_chain.fork_choice.clone()) - .map_err(|e| format!("Unable to decode fork choice from db: {:?}", e))? + let store = self + .store + .clone() + .ok_or_else(|| "reduced_tree_fork_choice requires a store.".to_string())?; + + let persisted_fork_choice = store + .get::(&Hash256::from_slice(&FORK_CHOICE_DB_KEY)) + .map_err(|e| format!("DB error when reading persisted fork choice: {:?}", e))?; + + let fork_choice = if let Some(persisted) = persisted_fork_choice { + ForkChoice::from_ssz_container(persisted) + .map_err(|e| format!("Unable to read persisted fork choice from disk: {:?}", e))? } else { let finalized_checkpoint = &self .finalized_checkpoint @@ -469,11 +520,6 @@ where TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, { - /// Sets the `BeaconChain` eth1 back-end to `CachingEth1Backend`. - pub fn caching_eth1_backend(self, backend: CachingEth1Backend) -> Self { - self.eth1_backend(Some(backend)) - } - /// Do not use any eth1 backend. The client will not be able to produce beacon blocks. pub fn no_eth1_backend(self) -> Self { self.eth1_backend(None) diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index 1ff1a0cfa..148e15630 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -4,6 +4,7 @@ use eth2_hashing::hash; use exit_future::Exit; use futures::Future; use slog::{debug, error, trace, Logger}; +use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use state_processing::per_block_processing::get_new_eth1_data; use std::cmp::Ordering; @@ -11,7 +12,7 @@ use std::collections::HashMap; use std::iter::DoubleEndedIterator; use std::marker::PhantomData; use std::sync::Arc; -use store::{Error as StoreError, Store}; +use store::{DBColumn, Error as StoreError, SimpleStoreItem, Store}; use types::{ BeaconState, BeaconStateError, ChainSpec, Deposit, Eth1Data, EthSpec, Hash256, Slot, Unsigned, DEPOSIT_TREE_DEPTH, @@ -52,6 +53,20 @@ pub struct SszEth1 { backend_bytes: Vec, } +impl SimpleStoreItem for SszEth1 { + fn db_column() -> DBColumn { + DBColumn::Eth1Cache + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Self::from_ssz_bytes(bytes).map_err(Into::into) + } +} + /// Holds an `Eth1ChainBackend` and serves requests from the `BeaconChain`. pub struct Eth1Chain where @@ -142,6 +157,11 @@ where backend_bytes: self.backend.as_bytes(), } } + + /// Consumes `self`, returning the backend. + pub fn into_backend(self) -> T { + self.backend + } } pub trait Eth1ChainBackend>: Sized + Send + Sync { diff --git a/beacon_node/beacon_chain/src/fork_choice.rs b/beacon_node/beacon_chain/src/fork_choice.rs index 9521fa9cf..9f4262ee4 100644 --- a/beacon_node/beacon_chain/src/fork_choice.rs +++ b/beacon_node/beacon_chain/src/fork_choice.rs @@ -4,10 +4,11 @@ use crate::{errors::BeaconChainError, metrics, BeaconChain, BeaconChainTypes}; use checkpoint_manager::{get_effective_balances, CheckpointManager, CheckpointWithBalances}; use parking_lot::{RwLock, RwLockReadGuard}; use proto_array_fork_choice::{core::ProtoArray, ProtoArrayForkChoice}; +use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use state_processing::common::get_indexed_attestation; use std::marker::PhantomData; -use store::Error as StoreError; +use store::{DBColumn, Error as StoreError, SimpleStoreItem}; use types::{BeaconBlock, BeaconState, BeaconStateError, Epoch, Hash256, IndexedAttestation, Slot}; type Result = std::result::Result; @@ -283,3 +284,17 @@ impl From for Error { Error::BackendError(e) } } + +impl SimpleStoreItem for SszForkChoice { + fn db_column() -> DBColumn { + DBColumn::ForkChoice + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> std::result::Result { + Self::from_ssz_bytes(bytes).map_err(Into::into) + } +} diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 54331016b..aee111f47 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -179,10 +179,16 @@ lazy_static! { try_create_int_counter("beacon_balances_cache_misses_total", "Count of times balances cache fulfils request"); /* - * Persisting BeaconChain to disk + * Persisting BeaconChain components to disk */ - pub static ref PERSIST_CHAIN: Result = - try_create_histogram("beacon_persist_chain", "Time taken to update the canonical head"); + pub static ref PERSIST_HEAD: Result = + try_create_histogram("beacon_persist_head", "Time taken to persist the canonical head"); + pub static ref PERSIST_OP_POOL: Result = + try_create_histogram("beacon_persist_op_pool", "Time taken to persist the operations pool"); + pub static ref PERSIST_ETH1_CACHE: Result = + try_create_histogram("beacon_persist_eth1_cache", "Time taken to persist the eth1 caches"); + pub static ref PERSIST_FORK_CHOICE: Result = + try_create_histogram("beacon_persist_fork_choice", "Time taken to persist the fork choice struct"); /* * Eth1 diff --git a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs index e8f619e92..3c310b3f3 100644 --- a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs +++ b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs @@ -1,28 +1,17 @@ -use crate::eth1_chain::SszEth1; -use crate::fork_choice::SszForkChoice; use crate::head_tracker::SszHeadTracker; -use crate::{BeaconChainTypes, CheckPoint}; -use operation_pool::PersistedOperationPool; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use store::{DBColumn, Error as StoreError, SimpleStoreItem}; use types::Hash256; -/// 32-byte key for accessing the `PersistedBeaconChain`. -pub const BEACON_CHAIN_DB_KEY: &str = "PERSISTEDBEACONCHAINPERSISTEDBEA"; - #[derive(Clone, Encode, Decode)] -pub struct PersistedBeaconChain { - pub canonical_head: CheckPoint, - pub finalized_checkpoint: CheckPoint, - pub op_pool: PersistedOperationPool, +pub struct PersistedBeaconChain { + pub canonical_head_block_root: Hash256, pub genesis_block_root: Hash256, pub ssz_head_tracker: SszHeadTracker, - pub fork_choice: SszForkChoice, - pub eth1_cache: Option, } -impl SimpleStoreItem for PersistedBeaconChain { +impl SimpleStoreItem for PersistedBeaconChain { fn db_column() -> DBColumn { DBColumn::BeaconChain } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 026cc8a32..0edd26ce3 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1,3 +1,7 @@ +pub use crate::beacon_chain::{ + BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY, +}; +pub use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::CachingEth1Backend, @@ -5,7 +9,6 @@ use crate::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BlockProcessingOutcome, StateSkipConfig, }; -use eth1::Config as Eth1Config; use genesis::interop_genesis_state; use rayon::prelude::*; use sloggers::{null::NullLoggerBuilder, Build}; @@ -24,7 +27,6 @@ use types::{ SecretKey, Signature, SignedBeaconBlock, SignedRoot, Slot, }; -pub use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; pub use types::test_utils::generate_deterministic_keypairs; // 4th September 2019 @@ -176,7 +178,7 @@ impl BeaconChainHarness> { .store(store.clone()) .store_migrator( as Migrate<_, E>>::new(store)) .data_dir(data_dir.path().to_path_buf()) - .resume_from_db(Eth1Config::default()) + .resume_from_db() .expect("should resume beacon chain from db") .dummy_eth1_backend() .expect("should build dummy backend") diff --git a/beacon_node/beacon_chain/tests/persistence_tests.rs b/beacon_node/beacon_chain/tests/persistence_tests.rs index 385408079..e51815bfe 100644 --- a/beacon_node/beacon_chain/tests/persistence_tests.rs +++ b/beacon_node/beacon_chain/tests/persistence_tests.rs @@ -72,7 +72,18 @@ fn finalizes_after_resuming_from_db() { let latest_slot = harness.chain.slot().expect("should have a slot"); - harness.chain.persist().expect("should persist the chain"); + harness + .chain + .persist_head_and_fork_choice() + .expect("should persist the head and fork choice"); + harness + .chain + .persist_op_pool() + .expect("should persist the op pool"); + harness + .chain + .persist_eth1_cache() + .expect("should persist the eth1 cache"); let data_dir = harness.data_dir; let original_chain = harness.chain; diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index d0711d897..6d4c5244f 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -6,11 +6,11 @@ extern crate lazy_static; use beacon_chain::AttestationProcessingOutcome; use beacon_chain::{ test_utils::{ - AttestationStrategy, BeaconChainHarness, BlockStrategy, HarnessType, PersistedBeaconChain, - BEACON_CHAIN_DB_KEY, + AttestationStrategy, BeaconChainHarness, BlockStrategy, HarnessType, OP_POOL_DB_KEY, }, BlockProcessingOutcome, }; +use operation_pool::PersistedOperationPool; use state_processing::{ per_slot_processing, per_slot_processing::Error as SlotProcessingError, EpochProcessingError, }; @@ -344,15 +344,21 @@ fn roundtrip_operation_pool() { assert!(harness.chain.op_pool.num_attestations() > 0); // TODO: could add some other operations - harness.chain.persist().unwrap(); + harness + .chain + .persist_op_pool() + .expect("should persist op pool"); - let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes()); - let p: PersistedBeaconChain> = - harness.chain.store.get(&key).unwrap().unwrap(); + let head_state = harness.chain.head().expect("should get head").beacon_state; - let restored_op_pool = p - .op_pool - .into_operation_pool(&p.canonical_head.beacon_state, &harness.spec); + let key = Hash256::from_slice(&OP_POOL_DB_KEY); + let restored_op_pool = harness + .chain + .store + .get::>(&key) + .expect("should read db") + .expect("should find op pool") + .into_operation_pool(&head_state, &harness.spec); assert_eq!(harness.chain.op_pool, restored_op_pool); } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 87a734f0d..42080663b 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -3,7 +3,7 @@ use crate::notifier::spawn_notifier; use crate::Client; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, - eth1_chain::CachingEth1Backend, + eth1_chain::{CachingEth1Backend, Eth1Chain}, slot_clock::{SlotClock, SystemTimeSlotClock}, store::{ migrate::{BackgroundMigrator, Migrate, NullMigrator}, @@ -234,10 +234,7 @@ where Box::new(future) } ClientGenesis::Resume => { - let future = builder - .resume_from_db(config.eth1) - .into_future() - .map(|v| (v, None)); + let future = builder.resume_from_db().into_future().map(|v| (v, None)); Box::new(future) } @@ -632,7 +629,20 @@ where CachingEth1Backend::from_service(eth1_service_from_genesis, store) } else { - CachingEth1Backend::new(config, context.log, store) + beacon_chain_builder + .get_persisted_eth1_backend()? + .map(|persisted| { + Eth1Chain::from_ssz_container( + &persisted, + config.clone(), + store.clone(), + &context.log, + ) + .map(|chain| chain.into_backend()) + }) + .unwrap_or_else(|| { + Ok(CachingEth1Backend::new(config, context.log.clone(), store)) + })? }; self.eth1_service = None; diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index a17fcd5b5..80da2ced1 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -191,7 +191,11 @@ pub enum DBColumn { BeaconMeta, BeaconBlock, BeaconState, + /// For persisting in-memory state to the database. BeaconChain, + OpPool, + Eth1Cache, + ForkChoice, /// For the table mapping restore point numbers to state roots. BeaconRestorePoint, /// For the mapping from state roots to their slots or summaries. @@ -211,6 +215,9 @@ impl Into<&'static str> for DBColumn { DBColumn::BeaconBlock => "blk", DBColumn::BeaconState => "ste", DBColumn::BeaconChain => "bch", + DBColumn::OpPool => "opo", + DBColumn::Eth1Cache => "etc", + DBColumn::ForkChoice => "frk", DBColumn::BeaconRestorePoint => "brp", DBColumn::BeaconStateSummary => "bss", DBColumn::BeaconBlockRoots => "bbr", diff --git a/eth2/operation_pool/Cargo.toml b/eth2/operation_pool/Cargo.toml index ded1e739e..7bfc86063 100644 --- a/eth2/operation_pool/Cargo.toml +++ b/eth2/operation_pool/Cargo.toml @@ -13,6 +13,7 @@ eth2_ssz = "0.1.2" eth2_ssz_derive = "0.1.0" serde = "1.0.102" serde_derive = "1.0.102" +store = { path = "../../beacon_node/store" } [dev-dependencies] rand = "0.7.2" diff --git a/eth2/operation_pool/src/persistence.rs b/eth2/operation_pool/src/persistence.rs index 0f6b634bc..592d4d18b 100644 --- a/eth2/operation_pool/src/persistence.rs +++ b/eth2/operation_pool/src/persistence.rs @@ -2,7 +2,9 @@ use crate::attestation_id::AttestationId; use crate::OperationPool; use parking_lot::RwLock; use serde_derive::{Deserialize, Serialize}; +use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; +use store::{DBColumn, Error as StoreError, SimpleStoreItem}; use types::*; /// SSZ-serializable version of `OperationPool`. @@ -99,3 +101,17 @@ impl PersistedOperationPool { } } } + +impl SimpleStoreItem for PersistedOperationPool { + fn db_column() -> DBColumn { + DBColumn::OpPool + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Self::from_ssz_bytes(bytes).map_err(Into::into) + } +}