Fix bug in block root storage (#4663)

## Issue Addressed

Fix a bug in the storage of the linear block roots array in the freezer DB. Previously this array was always written as part of state storage (or block backfill). With state pruning enabled by #4610, these states were no longer being written and as a result neither were the block roots.

The impact is quite low, we would just log an error when trying to forwards-iterate the block roots, which for validating nodes only happens when they try to look up blocks for peers:

> Aug 25 03:42:36.980 ERRO Missing chunk in forwards iterator      chunk index: 49726, service: freezer_db

Any node checkpoint synced off `unstable` is affected and has a corrupt database. If you see the log above, you need to re-sync with the fix. Nodes that haven't checkpoint synced recently should _not_ be corrupted, even if they ran the buggy version.

## Proposed Changes

- Use a `ChunkWriter` to write the block roots when states are not being stored.
- Tweak the usage of `get_latest_restore_point` so that it doesn't return a nonsense value when state pruning is enabled.
- Tweak the guarantee on the block roots array so that block roots are assumed available up to the split slot (exclusive). This is a bit nicer than relying on anything to do with the latest restore point, which is a nonsensical concept when there aren't any restore points.

## Additional Info

I'm looking forward to deleting the chunked vector code for good when we merge tree-states 😁
This commit is contained in:
Michael Sproul 2023-08-28 05:34:28 +00:00
parent d61f507184
commit f284e0e264
8 changed files with 145 additions and 43 deletions

1
Cargo.lock generated
View File

@ -9051,6 +9051,7 @@ dependencies = [
"http_api", "http_api",
"hyper", "hyper",
"log", "log",
"logging",
"network", "network",
"r2d2", "r2d2",
"rand 0.7.3", "rand 0.7.3",

View File

@ -481,6 +481,21 @@ where
let (_, updated_builder) = self.set_genesis_state(genesis_state)?; let (_, updated_builder) = self.set_genesis_state(genesis_state)?;
self = updated_builder; self = updated_builder;
// Fill in the linear block roots between the checkpoint block's slot and the aligned
// state's slot. All slots less than the block's slot will be handled by block backfill,
// while states greater or equal to the checkpoint state will be handled by `migrate_db`.
let block_root_batch = store
.store_frozen_block_root_at_skip_slots(
weak_subj_block.slot(),
weak_subj_state.slot(),
weak_subj_block_root,
)
.map_err(|e| format!("Error writing frozen block roots: {e:?}"))?;
store
.cold_db
.do_atomically(block_root_batch)
.map_err(|e| format!("Error writing frozen block roots: {e:?}"))?;
// Write the state and block non-atomically, it doesn't matter if they're forgotten // Write the state and block non-atomically, it doesn't matter if they're forgotten
// about on a crash restart. // about on a crash restart.
store store

View File

@ -421,7 +421,7 @@ async fn forwards_iter_block_and_state_roots_until() {
// The last restore point slot is the point at which the hybrid forwards iterator behaviour // The last restore point slot is the point at which the hybrid forwards iterator behaviour
// changes. // changes.
let last_restore_point_slot = store.get_latest_restore_point_slot(); let last_restore_point_slot = store.get_latest_restore_point_slot().unwrap();
assert!(last_restore_point_slot > 0); assert!(last_restore_point_slot > 0);
let chain = &harness.chain; let chain = &harness.chain;

View File

@ -30,16 +30,16 @@ where
/// Create a new iterator which can yield elements from `start_vindex` up to the last /// Create a new iterator which can yield elements from `start_vindex` up to the last
/// index stored by the restore point at `last_restore_point_slot`. /// index stored by the restore point at `last_restore_point_slot`.
/// ///
/// The `last_restore_point` slot should be the slot of a recent restore point as obtained from /// The `freezer_upper_limit` slot should be the slot of a recent restore point as obtained from
/// `HotColdDB::get_latest_restore_point_slot`. We pass it as a parameter so that the caller can /// `Root::freezer_upper_limit`. We pass it as a parameter so that the caller can
/// maintain a stable view of the database (see `HybridForwardsBlockRootsIterator`). /// maintain a stable view of the database (see `HybridForwardsBlockRootsIterator`).
pub fn new( pub fn new(
store: &'a HotColdDB<E, Hot, Cold>, store: &'a HotColdDB<E, Hot, Cold>,
start_vindex: usize, start_vindex: usize,
last_restore_point_slot: Slot, freezer_upper_limit: Slot,
spec: &ChainSpec, spec: &ChainSpec,
) -> Self { ) -> Self {
let (_, end_vindex) = F::start_and_end_vindex(last_restore_point_slot, spec); let (_, end_vindex) = F::start_and_end_vindex(freezer_upper_limit, spec);
// Set the next chunk to the one containing `start_vindex`. // Set the next chunk to the one containing `start_vindex`.
let next_cindex = start_vindex / F::chunk_size(); let next_cindex = start_vindex / F::chunk_size();

View File

@ -19,6 +19,14 @@ pub trait Root<E: EthSpec>: Field<E, Value = Hash256> {
end_state: BeaconState<E>, end_state: BeaconState<E>,
end_root: Hash256, end_root: Hash256,
) -> Result<SimpleForwardsIterator>; ) -> Result<SimpleForwardsIterator>;
/// The first slot for which this field is *no longer* stored in the freezer database.
///
/// If `None`, then this field is not stored in the freezer database at all due to pruning
/// configuration.
fn freezer_upper_limit<Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: &HotColdDB<E, Hot, Cold>,
) -> Option<Slot>;
} }
impl<E: EthSpec> Root<E> for BlockRoots { impl<E: EthSpec> Root<E> for BlockRoots {
@ -39,6 +47,13 @@ impl<E: EthSpec> Root<E> for BlockRoots {
)?; )?;
Ok(SimpleForwardsIterator { values }) Ok(SimpleForwardsIterator { values })
} }
fn freezer_upper_limit<Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: &HotColdDB<E, Hot, Cold>,
) -> Option<Slot> {
// Block roots are stored for all slots up to the split slot (exclusive).
Some(store.get_split_slot())
}
} }
impl<E: EthSpec> Root<E> for StateRoots { impl<E: EthSpec> Root<E> for StateRoots {
@ -59,6 +74,15 @@ impl<E: EthSpec> Root<E> for StateRoots {
)?; )?;
Ok(SimpleForwardsIterator { values }) Ok(SimpleForwardsIterator { values })
} }
fn freezer_upper_limit<Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: &HotColdDB<E, Hot, Cold>,
) -> Option<Slot> {
// State roots are stored for all slots up to the latest restore point (exclusive).
// There may not be a latest restore point if state pruning is enabled, in which
// case this function will return `None`.
store.get_latest_restore_point_slot()
}
} }
/// Forwards root iterator that makes use of a flat field table in the freezer DB. /// Forwards root iterator that makes use of a flat field table in the freezer DB.
@ -118,6 +142,7 @@ impl Iterator for SimpleForwardsIterator {
pub enum HybridForwardsIterator<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>> { pub enum HybridForwardsIterator<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>> {
PreFinalization { PreFinalization {
iter: Box<FrozenForwardsIterator<'a, E, F, Hot, Cold>>, iter: Box<FrozenForwardsIterator<'a, E, F, Hot, Cold>>,
end_slot: Option<Slot>,
/// Data required by the `PostFinalization` iterator when we get to it. /// Data required by the `PostFinalization` iterator when we get to it.
continuation_data: Option<Box<(BeaconState<E>, Hash256)>>, continuation_data: Option<Box<(BeaconState<E>, Hash256)>>,
}, },
@ -129,6 +154,7 @@ pub enum HybridForwardsIterator<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, C
PostFinalization { PostFinalization {
iter: SimpleForwardsIterator, iter: SimpleForwardsIterator,
}, },
Finished,
} }
impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>> impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
@ -138,8 +164,8 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
/// ///
/// The `get_state` closure should return a beacon state and final block/state root to backtrack /// The `get_state` closure should return a beacon state and final block/state root to backtrack
/// from in the case where the iterated range does not lie entirely within the frozen portion of /// from in the case where the iterated range does not lie entirely within the frozen portion of
/// the database. If an `end_slot` is provided and it is before the database's latest restore /// the database. If an `end_slot` is provided and it is before the database's freezer upper
/// point slot then the `get_state` closure will not be called at all. /// limit for the field then the `get_state` closure will not be called at all.
/// ///
/// It is OK for `get_state` to hold a lock while this function is evaluated, as the returned /// It is OK for `get_state` to hold a lock while this function is evaluated, as the returned
/// iterator is as lazy as possible and won't do any work apart from calling `get_state`. /// iterator is as lazy as possible and won't do any work apart from calling `get_state`.
@ -155,13 +181,15 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
) -> Result<Self> { ) -> Result<Self> {
use HybridForwardsIterator::*; use HybridForwardsIterator::*;
let latest_restore_point_slot = store.get_latest_restore_point_slot(); // First slot at which this field is *not* available in the freezer. i.e. all slots less
// than this slot have their data available in the freezer.
let freezer_upper_limit = F::freezer_upper_limit(store).unwrap_or(Slot::new(0));
let result = if start_slot < latest_restore_point_slot { let result = if start_slot < freezer_upper_limit {
let iter = Box::new(FrozenForwardsIterator::new( let iter = Box::new(FrozenForwardsIterator::new(
store, store,
start_slot, start_slot,
latest_restore_point_slot, freezer_upper_limit,
spec, spec,
)); ));
@ -169,13 +197,14 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
// `end_slot`. If it tries to continue further a `NoContinuationData` error will be // `end_slot`. If it tries to continue further a `NoContinuationData` error will be
// returned. // returned.
let continuation_data = let continuation_data =
if end_slot.map_or(false, |end_slot| end_slot < latest_restore_point_slot) { if end_slot.map_or(false, |end_slot| end_slot < freezer_upper_limit) {
None None
} else { } else {
Some(Box::new(get_state())) Some(Box::new(get_state()))
}; };
PreFinalization { PreFinalization {
iter, iter,
end_slot,
continuation_data, continuation_data,
} }
} else { } else {
@ -195,6 +224,7 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
match self { match self {
PreFinalization { PreFinalization {
iter, iter,
end_slot,
continuation_data, continuation_data,
} => { } => {
match iter.next() { match iter.next() {
@ -203,10 +233,17 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
// to a post-finalization iterator beginning from the last slot // to a post-finalization iterator beginning from the last slot
// of the pre iterator. // of the pre iterator.
None => { None => {
// If the iterator has an end slot (inclusive) which has already been
// covered by the (exclusive) frozen forwards iterator, then we're done!
let iter_end_slot = Slot::from(iter.inner.end_vindex);
if end_slot.map_or(false, |end_slot| iter_end_slot == end_slot + 1) {
*self = Finished;
return Ok(None);
}
let continuation_data = continuation_data.take(); let continuation_data = continuation_data.take();
let store = iter.inner.store; let store = iter.inner.store;
let start_slot = Slot::from(iter.inner.end_vindex); let start_slot = iter_end_slot;
*self = PostFinalizationLazy { *self = PostFinalizationLazy {
continuation_data, continuation_data,
store, store,
@ -230,6 +267,7 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
self.do_next() self.do_next()
} }
PostFinalization { iter } => iter.next().transpose(), PostFinalization { iter } => iter.next().transpose(),
Finished => Ok(None),
} }
} }
} }

View File

@ -18,7 +18,7 @@ use crate::metadata::{
}; };
use crate::metrics; use crate::metrics;
use crate::{ use crate::{
get_key_for_col, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, get_key_for_col, ChunkWriter, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp,
PartialBeaconState, StoreItem, StoreOp, PartialBeaconState, StoreItem, StoreOp,
}; };
use itertools::process_results; use itertools::process_results;
@ -963,6 +963,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
ops.push(op); ops.push(op);
// 2. Store updated vector entries. // 2. Store updated vector entries.
// Block roots need to be written here as well as by the `ChunkWriter` in `migrate_db`
// because states may require older block roots, and the writer only stores block roots
// between the previous split point and the new split point.
let db = &self.cold_db; let db = &self.cold_db;
store_updated_vector(BlockRoots, db, state, &self.spec, ops)?; store_updated_vector(BlockRoots, db, state, &self.spec, ops)?;
store_updated_vector(StateRoots, db, state, &self.spec, ops)?; store_updated_vector(StateRoots, db, state, &self.spec, ops)?;
@ -1243,10 +1246,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}; };
} }
/// Fetch the slot of the most recently stored restore point. /// Fetch the slot of the most recently stored restore point (if any).
pub fn get_latest_restore_point_slot(&self) -> Slot { pub fn get_latest_restore_point_slot(&self) -> Option<Slot> {
(self.get_split_slot() - 1) / self.config.slots_per_restore_point let split_slot = self.get_split_slot();
* self.config.slots_per_restore_point let anchor = self.get_anchor_info();
// There are no restore points stored if the state upper limit lies in the hot database.
// It hasn't been reached yet, and may never be.
if anchor.map_or(false, |a| a.state_upper_limit >= split_slot) {
None
} else {
Some(
(split_slot - 1) / self.config.slots_per_restore_point
* self.config.slots_per_restore_point,
)
}
} }
/// Load the database schema version from disk. /// Load the database schema version from disk.
@ -1585,6 +1599,25 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
) )
} }
/// Update the linear array of frozen block roots with the block root for several skipped slots.
///
/// Write the block root at all slots from `start_slot` (inclusive) to `end_slot` (exclusive).
pub fn store_frozen_block_root_at_skip_slots(
&self,
start_slot: Slot,
end_slot: Slot,
block_root: Hash256,
) -> Result<Vec<KeyValueStoreOp>, Error> {
let mut ops = vec![];
let mut block_root_writer =
ChunkWriter::<BlockRoots, _, _>::new(&self.cold_db, start_slot.as_usize())?;
for slot in start_slot.as_usize()..end_slot.as_usize() {
block_root_writer.set(slot, block_root, &mut ops)?;
}
block_root_writer.write(&mut ops)?;
Ok(ops)
}
/// Try to prune all execution payloads, returning early if there is no need to prune. /// Try to prune all execution payloads, returning early if there is no need to prune.
pub fn try_prune_execution_payloads(&self, force: bool) -> Result<(), Error> { pub fn try_prune_execution_payloads(&self, force: bool) -> Result<(), Error> {
let split = self.get_split_info(); let split = self.get_split_info();
@ -1725,7 +1758,14 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into()); return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into());
} }
let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new(); let mut hot_db_ops = vec![];
let mut cold_db_ops = vec![];
// Chunk writer for the linear block roots in the freezer DB.
// Start at the new upper limit because we iterate backwards.
let new_frozen_block_root_upper_limit = finalized_state.slot().as_usize().saturating_sub(1);
let mut block_root_writer =
ChunkWriter::<BlockRoots, _, _>::new(&store.cold_db, new_frozen_block_root_upper_limit)?;
// 1. Copy all of the states between the new finalized state and the split slot, from the hot DB // 1. Copy all of the states between the new finalized state and the split slot, from the hot DB
// to the cold DB. Delete the execution payloads of these now-finalized blocks. // to the cold DB. Delete the execution payloads of these now-finalized blocks.
@ -1750,6 +1790,9 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Delete the old summary, and the full state if we lie on an epoch boundary. // Delete the old summary, and the full state if we lie on an epoch boundary.
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot))); hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));
// Store the block root for this slot in the linear array of frozen block roots.
block_root_writer.set(slot.as_usize(), block_root, &mut cold_db_ops)?;
// Do not try to store states if a restore point is yet to be stored, or will never be // Do not try to store states if a restore point is yet to be stored, or will never be
// stored (see `STATE_UPPER_LIMIT_NO_RETAIN`). Make an exception for the genesis state // stored (see `STATE_UPPER_LIMIT_NO_RETAIN`). Make an exception for the genesis state
// which always needs to be copied from the hot DB to the freezer and should not be deleted. // which always needs to be copied from the hot DB to the freezer and should not be deleted.
@ -1759,29 +1802,34 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
.map_or(false, |anchor| slot < anchor.state_upper_limit) .map_or(false, |anchor| slot < anchor.state_upper_limit)
{ {
debug!(store.log, "Pruning finalized state"; "slot" => slot); debug!(store.log, "Pruning finalized state"; "slot" => slot);
continue; continue;
} }
let mut cold_db_ops: Vec<KeyValueStoreOp> = Vec::new();
if slot % store.config.slots_per_restore_point == 0 {
let state: BeaconState<E> = get_full_state(&store.hot_db, &state_root, &store.spec)?
.ok_or(HotColdDBError::MissingStateToFreeze(state_root))?;
store.store_cold_state(&state_root, &state, &mut cold_db_ops)?;
}
// Store a pointer from this state root to its slot, so we can later reconstruct states // Store a pointer from this state root to its slot, so we can later reconstruct states
// from their state root alone. // from their state root alone.
let cold_state_summary = ColdStateSummary { slot }; let cold_state_summary = ColdStateSummary { slot };
let op = cold_state_summary.as_kv_store_op(state_root); let op = cold_state_summary.as_kv_store_op(state_root);
cold_db_ops.push(op); cold_db_ops.push(op);
// There are data dependencies between calls to `store_cold_state()` that prevent us from if slot % store.config.slots_per_restore_point == 0 {
// doing one big call to `store.cold_db.do_atomically()` at end of the loop. let state: BeaconState<E> = get_full_state(&store.hot_db, &state_root, &store.spec)?
store.cold_db.do_atomically(cold_db_ops)?; .ok_or(HotColdDBError::MissingStateToFreeze(state_root))?;
store.store_cold_state(&state_root, &state, &mut cold_db_ops)?;
// Commit the batch of cold DB ops whenever a full state is written. Each state stored
// may read the linear fields of previous states stored.
store
.cold_db
.do_atomically(std::mem::take(&mut cold_db_ops))?;
}
} }
// Finish writing the block roots and commit the remaining cold DB ops.
block_root_writer.write(&mut cold_db_ops)?;
store.cold_db.do_atomically(cold_db_ops)?;
// Warning: Critical section. We have to take care not to put any of the two databases in an // Warning: Critical section. We have to take care not to put any of the two databases in an
// inconsistent state if the OS process dies at any point during the freezeing // inconsistent state if the OS process dies at any point during the freezeing
// procedure. // procedure.

View File

@ -45,3 +45,4 @@ network = { path = "../beacon_node/network" }
testcontainers = { git = "https://github.com/testcontainers/testcontainers-rs/", rev = "0f2c9851" } testcontainers = { git = "https://github.com/testcontainers/testcontainers-rs/", rev = "0f2c9851" }
unused_port = { path = "../common/unused_port" } unused_port = { path = "../common/unused_port" }
task_executor = { path = "../common/task_executor" } task_executor = { path = "../common/task_executor" }
logging = { path = "../common/logging" }

View File

@ -7,12 +7,21 @@ use beacon_chain::{
}; };
use eth2::{types::BlockId, BeaconNodeHttpClient, SensitiveUrl, Timeouts}; use eth2::{types::BlockId, BeaconNodeHttpClient, SensitiveUrl, Timeouts};
use http_api::test_utils::{create_api_server, ApiServer}; use http_api::test_utils::{create_api_server, ApiServer};
use log::error;
use logging::test_logger;
use network::NetworkReceivers; use network::NetworkReceivers;
use rand::distributions::Alphanumeric; use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::time::Duration;
use testcontainers::{clients::Cli, core::WaitFor, Image, RunnableImage};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::{runtime, task::JoinHandle};
use tokio_postgres::{config::Config as PostgresConfig, Client, NoTls};
use types::{Hash256, MainnetEthSpec, Slot}; use types::{Hash256, MainnetEthSpec, Slot};
use unused_port::unused_tcp4_port;
use url::Url; use url::Url;
use watch::{ use watch::{
client::WatchHttpClient, client::WatchHttpClient,
@ -22,17 +31,6 @@ use watch::{
updater::{handler::*, run_updater, Config as UpdaterConfig, WatchSpec}, updater::{handler::*, run_updater, Config as UpdaterConfig, WatchSpec},
}; };
use log::error;
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::{runtime, task::JoinHandle};
use tokio_postgres::{config::Config as PostgresConfig, Client, NoTls};
use unused_port::unused_tcp4_port;
use testcontainers::{clients::Cli, core::WaitFor, Image, RunnableImage};
#[derive(Debug)] #[derive(Debug)]
pub struct Postgres(HashMap<String, String>); pub struct Postgres(HashMap<String, String>);
@ -132,6 +130,7 @@ impl TesterBuilder {
reconstruct_historic_states: true, reconstruct_historic_states: true,
..ChainConfig::default() ..ChainConfig::default()
}) })
.logger(test_logger())
.deterministic_keypairs(VALIDATOR_COUNT) .deterministic_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store() .fresh_ephemeral_store()
.build(); .build();