diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index fb1d79e96..156c12863 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1380,7 +1380,9 @@ impl ExecutionPendingBlock { StoreOp::PutStateTemporaryFlag(state_root), ] }; - chain.store.do_atomically(state_batch)?; + chain + .store + .do_atomically_and_update_cache(state_batch, None)?; drop(txn_lock); confirmed_state_roots.push(state_root); diff --git a/beacon_node/store/src/garbage_collection.rs b/beacon_node/store/src/garbage_collection.rs index 329133632..2affaad63 100644 --- a/beacon_node/store/src/garbage_collection.rs +++ b/beacon_node/store/src/garbage_collection.rs @@ -31,7 +31,7 @@ where "Garbage collecting {} temporary states", delete_ops.len() / 2 ); - self.do_atomically(delete_ops)?; + self.do_atomically_and_update_cache(delete_ops, None)?; } Ok(()) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 845ee89b5..0aace19d5 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -849,11 +849,14 @@ impl, Cold: ItemStore> HotColdDB Ok(key_value_batch) } - pub fn do_atomically(&self, batch: Vec>) -> Result<(), Error> { + pub fn do_atomically_and_update_cache( + &self, + batch: Vec>, + blobs_batch: Option>>, + ) -> Result<(), Error> { // Update the block cache whilst holding a lock, to ensure that the cache updates atomically // with the database. let mut guard = self.block_cache.lock(); - let mut guard_blob = self.blob_cache.lock(); for op in &batch { match op { @@ -861,9 +864,7 @@ impl, Cold: ItemStore> HotColdDB guard.put(*block_root, (**block).clone()); } - StoreOp::PutBlobs(block_root, blobs) => { - guard_blob.put(*block_root, (**blobs).clone()); - } + StoreOp::PutBlobs(_, _) => (), StoreOp::PutState(_, _) => (), @@ -877,9 +878,7 @@ impl, Cold: ItemStore> HotColdDB guard.pop(block_root); } - StoreOp::DeleteBlobs(block_root) => { - guard_blob.pop(block_root); - } + StoreOp::DeleteBlobs(_) => (), StoreOp::DeleteState(_, _) => (), @@ -891,11 +890,30 @@ impl, Cold: ItemStore> HotColdDB } } + if let Some(blob_ops) = blobs_batch { + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + let mut guard_blob = self.blob_cache.lock(); + + for op in &blob_ops { + match op { + StoreOp::PutBlobs(block_root, blobs) => { + guard_blob.put(*block_root, (**blobs).clone()); + } + + StoreOp::DeleteBlobs(block_root) => { + guard_blob.pop(block_root); + } + + _ => (), + } + } + blobs_db.do_atomically(self.convert_to_kv_batch(blob_ops)?)?; + drop(guard_blob); + } + self.hot_db .do_atomically(self.convert_to_kv_batch(batch)?)?; - drop(guard); - drop(guard_blob); Ok(()) } @@ -1232,11 +1250,7 @@ impl, Cold: ItemStore> HotColdDB /// Fetch a blobs sidecar from the store. pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { - let blobs_db = if let Some(ref blobs_db) = self.blobs_db { - blobs_db - } else { - &self.cold_db - }; + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { Some(ref blobs_bytes) => { @@ -1751,7 +1765,7 @@ impl, Cold: ItemStore> HotColdDB } } let payloads_pruned = ops.len(); - self.do_atomically(ops)?; + self.do_atomically_and_update_cache(ops, None)?; info!( self.log, "Execution payload pruning complete"; @@ -2050,7 +2064,7 @@ pub fn migrate_database, Cold: ItemStore>( } // Delete the states from the hot database if we got this far. - store.do_atomically(hot_db_ops)?; + store.do_atomically_and_update_cache(hot_db_ops, None)?; debug!( store.log,