Backfill blob storage fix (#5119)

* store blobs in the correct db in backfill

* add database migration

* add migration file

* remove log info suggesting deneb isn't schedule

* add batching in blob migration
This commit is contained in:
realbigsean 2024-01-23 17:35:02 -05:00 committed by GitHub
parent 5851027bfd
commit 1cebf41452
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 83 additions and 5 deletions

View File

@ -101,8 +101,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?; ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
let mut new_oldest_blob_slot = blob_info.oldest_blob_slot; let mut new_oldest_blob_slot = blob_info.oldest_blob_slot;
let mut blob_batch = Vec::with_capacity(n_blobs_lists_to_import);
let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); let mut cold_batch = Vec::with_capacity(blocks_to_import.len());
let mut hot_batch = Vec::with_capacity(blocks_to_import.len() + n_blobs_lists_to_import); let mut hot_batch = Vec::with_capacity(blocks_to_import.len());
let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); let mut signed_blocks = Vec::with_capacity(blocks_to_import.len());
for available_block in blocks_to_import.into_iter().rev() { for available_block in blocks_to_import.into_iter().rev() {
@ -124,7 +125,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if let Some(blobs) = maybe_blobs { if let Some(blobs) = maybe_blobs {
new_oldest_blob_slot = Some(block.slot()); new_oldest_blob_slot = Some(block.slot());
self.store self.store
.blobs_as_kv_store_ops(&block_root, blobs, &mut hot_batch); .blobs_as_kv_store_ops(&block_root, blobs, &mut blob_batch);
} }
// Store block roots, including at all skip slots in the freezer DB. // Store block roots, including at all skip slots in the freezer DB.
@ -199,6 +200,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Write the I/O batches to disk, writing the blocks themselves first, as it's better // Write the I/O batches to disk, writing the blocks themselves first, as it's better
// for the hot DB to contain extra blocks than for the cold DB to point to blocks that // for the hot DB to contain extra blocks than for the cold DB to point to blocks that
// do not exist. // do not exist.
self.store.blobs_db.do_atomically(blob_batch)?;
self.store.hot_db.do_atomically(hot_batch)?; self.store.hot_db.do_atomically(hot_batch)?;
self.store.cold_db.do_atomically(cold_batch)?; self.store.cold_db.do_atomically(cold_batch)?;

View File

@ -1,6 +1,7 @@
//! Utilities for managing database schema changes. //! Utilities for managing database schema changes.
mod migration_schema_v17; mod migration_schema_v17;
mod migration_schema_v18; mod migration_schema_v18;
mod migration_schema_v19;
use crate::beacon_chain::BeaconChainTypes; use crate::beacon_chain::BeaconChainTypes;
use crate::types::ChainSpec; use crate::types::ChainSpec;
@ -69,6 +70,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v18::downgrade_from_v18::<T>(db.clone(), log)?; let ops = migration_schema_v18::downgrade_from_v18::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops) db.store_schema_version_atomically(to, ops)
} }
(SchemaVersion(18), SchemaVersion(19)) => {
let ops = migration_schema_v19::upgrade_to_v19::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(19), SchemaVersion(18)) => {
let ops = migration_schema_v19::downgrade_from_v19::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
// Anything else is an error. // Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to, target_version: to,

View File

@ -0,0 +1,65 @@
use crate::beacon_chain::BeaconChainTypes;
use slog::{debug, info, Logger};
use std::sync::Arc;
use store::{get_key_for_col, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp};
pub fn upgrade_to_v19<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
let mut hot_delete_ops = vec![];
let mut blob_keys = vec![];
let column = DBColumn::BeaconBlob;
debug!(log, "Migrating from v18 to v19");
// Iterate through the blobs on disk.
for res in db.hot_db.iter_column_keys::<Vec<u8>>(column) {
let key = res?;
let key_col = get_key_for_col(column.as_str(), &key);
hot_delete_ops.push(KeyValueStoreOp::DeleteKey(key_col));
blob_keys.push(key);
}
let num_blobs = blob_keys.len();
debug!(log, "Collected {} blob lists to migrate", num_blobs);
let batch_size = 500;
let mut batch = Vec::with_capacity(batch_size);
for key in blob_keys {
let next_blob = db.hot_db.get_bytes(column.as_str(), &key)?;
if let Some(next_blob) = next_blob {
let key_col = get_key_for_col(column.as_str(), &key);
batch.push(KeyValueStoreOp::PutKeyValue(key_col, next_blob));
if batch.len() >= batch_size {
db.blobs_db.do_atomically(batch.clone())?;
batch.clear();
}
}
}
// Process the remaining batch if it's not empty
if !batch.is_empty() {
db.blobs_db.do_atomically(batch)?;
}
debug!(log, "Wrote {} blobs to the blobs db", num_blobs);
// Delete all the blobs
info!(log, "Upgrading to v19 schema");
Ok(hot_delete_ops)
}
pub fn downgrade_from_v19<T: BeaconChainTypes>(
_db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// No-op
info!(
log,
"Downgrading to v18 schema";
);
Ok(vec![])
}

View File

@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
use types::{Checkpoint, Hash256, Slot}; use types::{Checkpoint, Hash256, Slot};
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(18); pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(19);
// All the keys that get stored under the `BeaconMeta` column. // All the keys that get stored under the `BeaconMeta` column.
// //

View File

@ -16,7 +16,8 @@ validator client or the slasher**.
| Lighthouse version | Release date | Schema version | Downgrade available? | | Lighthouse version | Release date | Schema version | Downgrade available? |
|--------------------|--------------|----------------|----------------------| |--------------------|--------------|----------------|----------------------|
| v4.6.0 | Dec 2023 | v18 | yes before Deneb | | v4.6.0 | Dec 2023 | v19 | yes before Deneb |
| v4.6.0-rc.0 | Dec 2023 | v18 | yes before Deneb |
| v4.5.0 | Sep 2023 | v17 | yes | | v4.5.0 | Sep 2023 | v17 | yes |
| v4.4.0 | Aug 2023 | v17 | yes | | v4.4.0 | Aug 2023 | v17 | yes |
| v4.3.0 | Jul 2023 | v17 | yes | | v4.3.0 | Jul 2023 | v17 | yes |
@ -192,7 +193,8 @@ Here are the steps to prune historic states:
| Lighthouse version | Release date | Schema version | Downgrade available? | | Lighthouse version | Release date | Schema version | Downgrade available? |
|--------------------|--------------|----------------|-------------------------------------| |--------------------|--------------|----------------|-------------------------------------|
| v4.6.0 | Dec 2023 | v18 | yes before Deneb | | v4.6.0 | Dec 2023 | v19 | yes before Deneb |
| v4.6.0-rc.0 | Dec 2023 | v18 | yes before Deneb |
| v4.5.0 | Sep 2023 | v17 | yes | | v4.5.0 | Sep 2023 | v17 | yes |
| v4.4.0 | Aug 2023 | v17 | yes | | v4.4.0 | Aug 2023 | v17 | yes |
| v4.3.0 | Jul 2023 | v17 | yes | | v4.3.0 | Jul 2023 | v17 | yes |