diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0023386a2..c1b4dc6b5 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3021,7 +3021,6 @@ impl BeaconChain { ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); -<<<<<<< HEAD // Only consider blobs if the eip4844 fork is enabled. if let Some(data_availability_boundary) = self.data_availability_boundary() { let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); @@ -3032,7 +3031,7 @@ impl BeaconChain { // margin, or younger (of higher epoch number). if block_epoch >= import_boundary { if let Some(blobs) = blobs { - if blobs.blobs.len() > 0 { + if !blobs.blobs.is_empty() { //FIXME(sean) using this for debugging for now info!( self.log, "Writing blobs to store"; @@ -3041,16 +3040,8 @@ impl BeaconChain { ops.push(StoreOp::PutBlobs(block_root, blobs)); } } -======= - if let Some(blobs) = blobs { - if blobs.blobs.len() > 0 { - //FIXME(sean) using this for debugging for now - info!(self.log, "Writing blobs to store"; "block_root" => ?block_root); - self.store.put_blobs(&block_root, (&*blobs).clone())?; ->>>>>>> 43dc3a9a4 (Fix rebase conflicts) } } - let txn_lock = self.store.hot_db.begin_rw_transaction(); if let Err(e) = self.store.do_atomically(ops) { @@ -3089,6 +3080,17 @@ impl BeaconChain { return Err(e.into()); } + + if let Some(blobs) = blobs? { + if blobs.blobs.len() > 0 { + //FIXME(sean) using this for debugging for now + info!(self.log, "Writing blobs to store"; "block_root" => ?block_root); + // WARNING! Deadlocks if the alternative to a separate blobs db is + // changed from the cold db to the hot db. + self.store.put_blobs(&block_root, (&*blobs).clone())?; + } + }; + drop(txn_lock); // The fork choice write-lock is dropped *after* the on-disk database has been updated. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 156c12863..70eec4ecc 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1382,7 +1382,7 @@ impl ExecutionPendingBlock { }; chain .store - .do_atomically_and_update_cache(state_batch, None)?; + .do_atomically_with_block_and_blobs_cache(state_batch)?; drop(txn_lock); confirmed_state_roots.push(state_root); diff --git a/beacon_node/store/src/garbage_collection.rs b/beacon_node/store/src/garbage_collection.rs index 2affaad63..c70ef8986 100644 --- a/beacon_node/store/src/garbage_collection.rs +++ b/beacon_node/store/src/garbage_collection.rs @@ -31,7 +31,7 @@ where "Garbage collecting {} temporary states", delete_ops.len() / 2 ); - self.do_atomically_and_update_cache(delete_ops, None)?; + self.do_atomically_with_block_and_blobs_cache(delete_ops)?; } Ok(()) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index f47dc57ee..eaef8550e 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -544,7 +544,8 @@ impl, Cold: ItemStore> HotColdDB } pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar) -> Result<(), Error> { - self.hot_db.put_bytes( + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + blobs_db.put_bytes( DBColumn::BeaconBlob.into(), block_root.as_bytes(), &blobs.as_ssz_bytes(), @@ -849,19 +850,38 @@ impl, Cold: ItemStore> HotColdDB Ok(key_value_batch) } - pub fn do_atomically_and_update_cache( + pub fn do_atomically_with_block_and_blobs_cache( &self, batch: Vec>, - blobs_batch: Option>>, ) -> Result<(), Error> { - // Update the block cache whilst holding a lock, to ensure that the cache updates atomically - // with the database. - let mut guard = self.block_cache.lock(); + let mut hot_db_cache_ops = Vec::new(); - for op in &batch { + let (blobs_ops, hot_db_ops) = batch.into_iter().partition(|store_op| match store_op { + StoreOp::PutBlobs(_, _) | StoreOp::DeleteBlobs(_) => true, + StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => { + hot_db_cache_ops.push(store_op.clone()); + false + } + _ => false, + }); + + // Update database whilst holding a lock on cache, to ensure that the cache updates + // atomically with the database. + let mut guard = self.block_cache.lock(); + let mut guard_blob = self.blob_cache.lock(); + + self.hot_db + .do_atomically(self.convert_to_kv_batch(hot_db_ops)?)?; + + let blob_cache_ops = blobs_ops.clone(); + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + // todo(emhane): do we want to restore the hot db writes if this fails? + blobs_db.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?; + + for op in hot_db_cache_ops { match op { StoreOp::PutBlock(block_root, block) => { - guard.put(*block_root, (**block).clone()); + guard.put(block_root, (*block).clone()); } StoreOp::PutBlobs(_, _) => (), @@ -875,7 +895,7 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteStateTemporaryFlag(_) => (), StoreOp::DeleteBlock(block_root) => { - guard.pop(block_root); + guard.pop(&block_root); } StoreOp::DeleteBlobs(_) => (), @@ -890,39 +910,22 @@ impl, Cold: ItemStore> HotColdDB } } - let guard_blob = match blobs_batch { - Some(blob_ops) => { - let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); - // Update the blob cache whilst holding a lock, while holding a lock on the block - // cache, to ensure they and their databases all update atomically. - let mut guard_blob = self.blob_cache.lock(); - - for op in &blob_ops { - match op { - StoreOp::PutBlobs(block_root, blobs) => { - guard_blob.put(*block_root, (**blobs).clone()); - } - - StoreOp::DeleteBlobs(block_root) => { - guard_blob.pop(block_root); - } - - _ => (), - } + for op in blob_cache_ops { + match op { + StoreOp::PutBlobs(block_root, blobs) => { + guard_blob.put(block_root, (*blobs).clone()); } - blobs_db.do_atomically(self.convert_to_kv_batch(blob_ops)?)?; - Some(guard_blob) - } - None => None, - }; - self.hot_db - .do_atomically(self.convert_to_kv_batch(batch)?)?; + StoreOp::DeleteBlobs(block_root) => { + guard_blob.pop(&block_root); + } + + _ => (), + } + } drop(guard); - if let Some(guard_blob) = guard_blob { - drop(guard_blob); - } + drop(guard_blob); Ok(()) } @@ -1774,7 +1777,7 @@ impl, Cold: ItemStore> HotColdDB } } let payloads_pruned = ops.len(); - self.do_atomically_and_update_cache(ops, None)?; + self.do_atomically_with_block_and_blobs_cache(ops)?; info!( self.log, "Execution payload pruning complete"; @@ -2073,7 +2076,7 @@ pub fn migrate_database, Cold: ItemStore>( } // Delete the states from the hot database if we got this far. - store.do_atomically_and_update_cache(hot_db_ops, None)?; + store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?; debug!( store.log, diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 1d7e92b80..9305b3da0 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -154,6 +154,7 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati /// Reified key-value storage operation. Helps in modifying the storage atomically. /// See also https://github.com/sigp/lighthouse/issues/692 +#[derive(Clone)] pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index d0cb27931..d9b480836 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -95,7 +95,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true), ) .arg( -<<<<<<< HEAD Arg::with_name("blob-prune-margin-epochs") .long("blob-prune-margin-epochs") .help( @@ -104,12 +103,10 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { ) .takes_value(true) .default_value("0"), - Arg::with_name("blobs-freezer-dir") - .long("blobs-freezer-dir") -======= + ) + .arg( Arg::with_name("blobs-dir") .long("blobs-dir") ->>>>>>> 43dc3a9a4 (Fix rebase conflicts) .value_name("DIR") .help("Data directory for the blobs database.") .takes_value(true),