diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 721460397..0023386a2 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1072,7 +1072,7 @@ impl BeaconChain { block_root: &Hash256, data_availability_boundary: Epoch, ) -> Result>, Error> { - match self.store.get_blobs(block_root, slot)? { + match self.store.get_blobs(block_root)? { Some(blobs) => Ok(Some(blobs)), None => { // Check for the corresponding block to understand whether we *should* have blobs. @@ -3021,6 +3021,7 @@ impl BeaconChain { ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); +<<<<<<< HEAD // Only consider blobs if the eip4844 fork is enabled. if let Some(data_availability_boundary) = self.data_availability_boundary() { let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); @@ -3040,6 +3041,13 @@ impl BeaconChain { ops.push(StoreOp::PutBlobs(block_root, blobs)); } } +======= + if let Some(blobs) = blobs { + if blobs.blobs.len() > 0 { + //FIXME(sean) using this for debugging for now + info!(self.log, "Writing blobs to store"; "block_root" => ?block_root); + self.store.put_blobs(&block_root, (&*blobs).clone())?; +>>>>>>> 43dc3a9a4 (Fix rebase conflicts) } } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index b91e29355..e80b6fd18 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -68,7 +68,7 @@ pub struct ClientBuilder { gossipsub_registry: Option, db_path: Option, freezer_db_path: Option, - blobs_freezer_db_path: Option, + blobs_db_path: Option, http_api_config: http_api::Config, http_metrics_config: http_metrics::Config, slasher: Option>>, @@ -101,7 +101,7 @@ where gossipsub_registry: None, db_path: None, freezer_db_path: None, - blobs_freezer_db_path: None, + blobs_db_path: None, http_api_config: <_>::default(), http_metrics_config: <_>::default(), slasher: None, @@ -894,7 +894,7 @@ where mut self, hot_path: &Path, cold_path: &Path, - cold_blobs_path: Option, + blobs_path: Option, config: StoreConfig, log: Logger, ) -> Result { @@ -910,7 +910,7 @@ where self.db_path = Some(hot_path.into()); self.freezer_db_path = Some(cold_path.into()); - self.blobs_freezer_db_path = cold_blobs_path.clone(); + self.blobs_db_path = blobs_path.clone(); let inner_spec = spec.clone(); let deposit_contract_deploy_block = context @@ -933,7 +933,7 @@ where let store = HotColdDB::open( hot_path, cold_path, - cold_blobs_path, + blobs_path, schema_upgrade, config, spec, diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 10bca9471..4ea59a9da 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -49,9 +49,8 @@ pub struct Config { pub db_name: String, /// Path where the freezer database will be located. pub freezer_db_path: Option, - /// Path where the blobs freezer database will be located if it should be separate from the - /// historical state freezer. - pub blobs_freezer_db_path: Option, + /// Path where the blobs database will be located if blobs should be in a separate database. + pub blobs_db_path: Option, pub log_file: PathBuf, /// If true, the node will use co-ordinated junk for eth1 values. /// @@ -92,7 +91,7 @@ impl Default for Config { data_dir: PathBuf::from(DEFAULT_ROOT_DIR), db_name: "chain_db".to_string(), freezer_db_path: None, - blobs_freezer_db_path: None, + blobs_db_path: None, log_file: PathBuf::from(""), genesis: <_>::default(), store: <_>::default(), @@ -153,12 +152,12 @@ impl Config { .unwrap_or_else(|| self.default_freezer_db_path()) } - /// Returns the path to which the client may initialize the on-disk blobs freezer database. + /// Returns the path to which the client may initialize the on-disk blobs database. /// /// Will attempt to use the user-supplied path from e.g. the CLI, or will default /// to None. - pub fn get_blobs_freezer_db_path(&self) -> Option { - self.blobs_freezer_db_path.clone() + pub fn get_blobs_db_path(&self) -> Option { + self.blobs_db_path.clone() } /// Get the freezer DB path, creating it if necessary. @@ -166,10 +165,10 @@ impl Config { ensure_dir_exists(self.get_freezer_db_path()) } - /// Get the blobs freezer DB path, creating it if necessary. - pub fn create_blobs_freezer_db_path(&self) -> Result, String> { - match self.get_blobs_freezer_db_path() { - Some(blobs_freezer_path) => Ok(Some(ensure_dir_exists(blobs_freezer_path)?)), + /// Get the blobs DB path, creating it if necessary. + pub fn create_blobs_db_path(&self) -> Result, String> { + match self.get_blobs_db_path() { + Some(blobs_db_path) => Ok(Some(ensure_dir_exists(blobs_db_path)?)), None => Ok(None), } } diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 96e3d6fc8..9e152dc61 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -218,7 +218,7 @@ impl BlockId { chain: &BeaconChain, ) -> Result>, warp::Rejection> { let root = self.root(chain)?.0; - match chain.get_blobs(&root, None) { + match chain.get_blobs(&root) { Ok(Some(blob)) => Ok(Arc::new(blob)), Ok(None) => Err(warp_utils::reject::custom_not_found(format!( "Blob with block root {} is not in the store", diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 0c903ec33..01b7cb43b 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -795,14 +795,12 @@ impl Worker { // remove all skip slots let block_roots = block_roots.into_iter().flatten().collect::>(); - let mut slot_hint: Option = None; let mut blobs_sent = 0; let mut send_response = true; for root in block_roots { match self.chain.get_blobs(&root, data_availability_boundary) { Ok(Some(blobs)) => { - slot_hint = Some(blobs.beacon_block_slot + 1); blobs_sent += 1; self.send_network_message(NetworkMessage::SendResponse { peer_id, diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index a3e4dbcc8..eb6754aa9 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -29,10 +29,10 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) ) .arg( - Arg::with_name("blobs-freezer-dir") - .long("blobs-freezer-dir") + Arg::with_name("blobs-dir") + .long("blobs-dir") .value_name("DIR") - .help("Data directory for the blobs freezer database.") + .help("Data directory for the blobs database.") .takes_value(true) ) /* diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 4fa916dc3..e8128cb79 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -390,8 +390,8 @@ pub fn get_config( client_config.freezer_db_path = Some(PathBuf::from(freezer_dir)); } - if let Some(blobs_freezer_dir) = cli_args.value_of("blobs-freezer-dir") { - client_config.blobs_freezer_db_path = Some(PathBuf::from(blobs_freezer_dir)); + if let Some(blobs_db_dir) = cli_args.value_of("blobs-dir") { + client_config.blobs_db_path = Some(PathBuf::from(blobs_db_dir)); } let (sprp, sprp_explicit) = get_slots_per_restore_point::(cli_args)?; diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 532f1bdfe..b098f57c7 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -64,7 +64,7 @@ impl ProductionBeaconNode { let _datadir = client_config.create_data_dir()?; let db_path = client_config.create_db_path()?; let freezer_db_path = client_config.create_freezer_db_path()?; - let blobs_freezer_db_path = client_config.create_blobs_freezer_db_path()?; + let blobs_db_path = client_config.create_blobs_db_path()?; let executor = context.executor.clone(); if let Some(legacy_dir) = client_config.get_existing_legacy_data_dir() { @@ -88,7 +88,7 @@ impl ProductionBeaconNode { .disk_store( &db_path, &freezer_db_path, - blobs_freezer_db_path, + blobs_db_path, store_config, log.clone(), )?; diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 01971e2b3..845ee89b5 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -59,8 +59,8 @@ pub struct HotColdDB, Cold: ItemStore> { pub(crate) config: StoreConfig, /// Cold database containing compact historical data. pub cold_db: Cold, - /// Cold database containing blob data with slots less than `split.slot`. - pub cold_blobs_db: Option, + /// Database containing blobs. + pub blobs_db: Option, /// Hot database containing duplicated but quick-to-access recent data. /// /// The hot database also contains all blocks. @@ -101,7 +101,7 @@ pub enum HotColdDBError { MissingExecutionPayload(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingAnchorInfo, - MissingPathToBlobsFreezer, + MissingPathToBlobsDatabase, HotStateSummaryError(BeaconStateError), RestorePointDecodeError(ssz::DecodeError), BlockReplayBeaconError(BeaconStateError), @@ -138,7 +138,7 @@ impl HotColdDB, MemoryStore> { anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), cold_db: MemoryStore::open(), - cold_blobs_db: Some(MemoryStore::open()), + blobs_db: Some(MemoryStore::open()), hot_db: MemoryStore::open(), block_cache: Mutex::new(LruCache::new(config.block_cache_size)), blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)), @@ -162,7 +162,7 @@ impl HotColdDB, LevelDB> { pub fn open( hot_path: &Path, cold_path: &Path, - cold_blobs_path: Option, + blobs_db_path: Option, migrate_schema: impl FnOnce(Arc, SchemaVersion, SchemaVersion) -> Result<(), Error>, config: StoreConfig, spec: ChainSpec, @@ -175,7 +175,7 @@ impl HotColdDB, LevelDB> { anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), cold_db: LevelDB::open(cold_path)?, - cold_blobs_db: None, + blobs_db: None, hot_db: LevelDB::open(hot_path)?, block_cache: Mutex::new(LruCache::new(config.block_cache_size)), blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)), @@ -220,23 +220,29 @@ impl HotColdDB, LevelDB> { ); } - if db.spec.eip4844_fork_epoch.is_some() { - let blob_info = match db.load_blob_info()? { - Some(mut blob_info) => { - if blob_info.blobs_freezer { - cold_blobs_path - .as_ref() - .ok_or(HotColdDBError::MissingPathToBlobsFreezer)?; - } - if let Some(path) = cold_blobs_path { - db.cold_blobs_db = Some(LevelDB::open(path.as_path())?); - blob_info.blobs_freezer = true; - } - Some(blob_info) - } - None => Some(BlobInfo::default()), - }; - *db.blob_info.write() = blob_info; + let blob_info_on_disk = db.load_blob_info()?; + + if let Some(ref blob_info) = blob_info_on_disk { + let prev_blobs_db = blob_info.blobs_db; + if prev_blobs_db { + blobs_db_path + .as_ref() + .ok_or(HotColdDBError::MissingPathToBlobsDatabase)?; + } + } + + if let Some(path) = blobs_db_path { + if db.spec.eip4844_fork_epoch.is_some() { + db.blobs_db = Some(LevelDB::open(path.as_path())?); + db.compare_and_set_blob_info_with_write( + blob_info_on_disk, + Some(BlobInfo { blobs_db: true }), + )?; + info!( + db.log, + "Blobs DB initialized"; + ); + } } // Ensure that the schema version of the on-disk database matches the software. @@ -547,29 +553,6 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - /// Fetch a blobs sidecar from the store. - /// - /// If `slot` is provided then it will be used as a hint as to which database should - /// be checked first. - pub fn get_blobs( - &self, - block_root: &Hash256, - slot: Option, - ) -> Result>, Error> { - if let Some(slot) = slot { - if slot < self.get_split_slot() { - return match self.load_cold_blobs(block_root)? { - Some(blobs) => Ok(Some(blobs)), - None => self.load_hot_blobs(block_root), - }; - } - } - match self.load_hot_blobs(block_root)? { - Some(blobs) => Ok(Some(blobs)), - None => self.load_cold_blobs(block_root), - } - } - pub fn blobs_as_kv_store_ops( &self, key: &Hash256, @@ -910,6 +893,7 @@ impl, Cold: ItemStore> HotColdDB self.hot_db .do_atomically(self.convert_to_kv_batch(batch)?)?; + drop(guard); drop(guard_blob); @@ -1246,35 +1230,22 @@ impl, Cold: ItemStore> HotColdDB }) } - /// Load a blobs sidecar from the hot database. - pub fn load_hot_blobs(&self, block_root: &Hash256) -> Result>, Error> { - // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, - // may want to attempt to use one again - match self - .hot_db - .get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? - { - Some(bytes) => { - let ret = BlobsSidecar::from_ssz_bytes(&bytes)?; - self.blob_cache.lock().put(*block_root, ret.clone()); - Ok(Some(ret)) - } - None => Ok(None), - } - } - - /// Try to load a blobs from the freezer database. - /// - /// Return `None` if no blobs sidecar with `block_root` lies in the freezer. - pub fn load_cold_blobs(&self, block_root: &Hash256) -> Result>, Error> { - let blobs_freezer = if let Some(ref cold_blobs_db) = self.cold_blobs_db { - cold_blobs_db + /// Fetch a blobs sidecar from the store. + pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { + let blobs_db = if let Some(ref blobs_db) = self.blobs_db { + blobs_db } else { &self.cold_db }; - match blobs_freezer.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { - Some(ref blobs_bytes) => Ok(Some(BlobsSidecar::from_ssz_bytes(blobs_bytes)?)), + match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { + Some(ref blobs_bytes) => { + let blobs = BlobsSidecar::from_ssz_bytes(blobs_bytes)?; + // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, + // may want to attempt to use one again + self.blob_cache.lock().put(*block_root, blobs.clone()); + Ok(Some(blobs)) + } None => Ok(None), } } @@ -1985,12 +1956,6 @@ pub fn migrate_database, Cold: ItemStore>( } let mut hot_db_ops: Vec> = Vec::new(); - let mut cold_blobs_db_ops: Vec> = Vec::new(); - - let blobs_freezer = match store.cold_blobs_db { - Some(ref cold_blobs_db) => cold_blobs_db, - None => &store.cold_db, - }; // 1. Copy all of the states between the head and the split slot, from the hot DB // to the cold DB. Delete the execution payloads of these now-finalized blocks. @@ -2034,17 +1999,8 @@ pub fn migrate_database, Cold: ItemStore>( if store.config.prune_payloads { hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root)); } - - // Prepare migration of blobs to freezer. - if let Some(blobs) = store.get_blobs(&block_root, Some(slot))? { - hot_db_ops.push(StoreOp::DeleteBlobs(block_root)); - cold_blobs_db_ops.push(StoreOp::PutBlobs(block_root, Arc::new(blobs))); - } } - // Migrate blobs to freezer. - blobs_freezer.do_atomically(store.convert_to_kv_batch(cold_blobs_db_ops)?)?; - // Warning: Critical section. We have to take care not to put any of the two databases in an // inconsistent state if the OS process dies at any point during the freezing // procedure. @@ -2057,9 +2013,6 @@ pub fn migrate_database, Cold: ItemStore>( // Flush to disk all the states that have just been migrated to the cold store. store.cold_db.sync()?; - if let Some(ref cold_blobs_db) = store.cold_blobs_db { - cold_blobs_db.sync()?; - } { let mut split_guard = store.split.write(); diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index b5de0048f..92117254f 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -124,6 +124,8 @@ impl StoreItem for AnchorInfo { pub struct BlobInfo { /// The slot after which blobs are available (>=). pub oldest_blob_slot: Option, + /// A separate blobs database is in use. + pub blobs_db: bool, } impl StoreItem for BlobInfo { diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 93377c60e..837ad0aef 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -95,6 +95,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true), ) .arg( +<<<<<<< HEAD Arg::with_name("blob-prune-margin-epochs") .long("blob-prune-margin-epochs") .help( @@ -105,8 +106,12 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("0"), Arg::with_name("blobs-freezer-dir") .long("blobs-freezer-dir") +======= + Arg::with_name("blobs-dir") + .long("blobs-dir") +>>>>>>> 43dc3a9a4 (Fix rebase conflicts) .value_name("DIR") - .help("Data directory for the blobs freezer database.") + .help("Data directory for the blobs database.") .takes_value(true), ) .subcommand(migrate_cli_app()) @@ -128,8 +133,8 @@ fn parse_client_config( client_config.freezer_db_path = Some(freezer_dir); } - if let Some(blobs_freezer_dir) = clap_utils::parse_optional(cli_args, "blobs-freezer-dir")? { - client_config.blobs_freezer_db_path = Some(blobs_freezer_dir); + if let Some(blobs_db_dir) = clap_utils::parse_optional(cli_args, "blobs-dir")? { + client_config.blobs_db_path = Some(blobs_db_dir); } let (sprp, sprp_explicit) = get_slots_per_restore_point::(cli_args)?; @@ -153,13 +158,13 @@ pub fn display_db_version( let spec = runtime_context.eth2_config.spec.clone(); let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); - let cold_blobs_path = client_config.get_blobs_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let mut version = CURRENT_SCHEMA_VERSION; HotColdDB::, LevelDB>::open( &hot_path, &cold_path, - &cold_blobs_path, + &blobs_path, |_, from, _| { version = from; Ok(()) @@ -211,12 +216,12 @@ pub fn inspect_db( let spec = runtime_context.eth2_config.spec.clone(); let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); - let cold_blobs_path = client_config.get_blobs_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, - &cold_blobs_path, + &blobs_path, |_, _, _| Ok(()), client_config.store, spec, @@ -267,14 +272,14 @@ pub fn migrate_db( let spec = &runtime_context.eth2_config.spec; let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); - let cold_blobs_path = client_config.get_blobs_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let mut from = CURRENT_SCHEMA_VERSION; let to = migrate_config.to; let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, - &cold_blobs_path, + &blobs_path, |_, db_initial_version, _| { from = db_initial_version; Ok(()) @@ -309,12 +314,12 @@ pub fn prune_payloads( let spec = &runtime_context.eth2_config.spec; let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); - let cold_blobs_path = client_config.get_blobs_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, - &cold_blobs_path, + &blobs_path, |_, _, _| Ok(()), client_config.store, spec.clone(),