diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index d1e33d51d..562a6bc51 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -860,30 +860,67 @@ impl, Cold: ItemStore> HotColdDB &self, batch: Vec>, ) -> Result<(), Error> { - let mut hot_db_cache_ops = Vec::new(); - - let (blobs_ops, hot_db_ops) = batch.into_iter().partition(|store_op| match store_op { - StoreOp::PutBlobs(_, _) | StoreOp::DeleteBlobs(_) => true, - StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => { - hot_db_cache_ops.push(store_op.clone()); - false - } - _ => false, - }); + let mut blobs_to_delete = Vec::new(); + let (blobs_ops, hot_db_ops): (Vec>, Vec>) = + batch.into_iter().partition(|store_op| match store_op { + StoreOp::PutBlobs(_, _) => true, + StoreOp::DeleteBlobs(block_root) => { + match self.get_blobs(block_root) { + Ok(Some(blobs_sidecar)) => { + blobs_to_delete.push(blobs_sidecar); + } + Err(e) => { + error!( + self.log, "Error getting blobs"; + "block_root" => %block_root, + "error" => ?e + ); + } + _ => (), + } + true + } + StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => false, + _ => false, + }); // Update database whilst holding a lock on cache, to ensure that the cache updates // atomically with the database. let mut guard = self.block_cache.lock(); let mut guard_blob = self.blob_cache.lock(); - self.hot_db - .do_atomically(self.convert_to_kv_batch(hot_db_ops)?)?; - let blob_cache_ops = blobs_ops.clone(); let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); - // todo(emhane): do we want to restore the hot db writes if this fails? + // Try to execute blobs store ops. blobs_db.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?; + let hot_db_cache_ops = hot_db_ops.clone(); + // Try to execute hot db store ops. + let tx_res = match self.convert_to_kv_batch(hot_db_ops) { + Ok(kv_store_ops) => self.hot_db.do_atomically(kv_store_ops), + Err(e) => Err(e), + }; + // Rollback on failure + if let Err(e) = tx_res { + let mut rollback_blob_ops: Vec> = Vec::with_capacity(blob_cache_ops.len()); + for blob_op in blob_cache_ops { + match blob_op { + StoreOp::PutBlobs(block_root, _) => { + rollback_blob_ops.push(StoreOp::DeleteBlobs(block_root)); + } + StoreOp::DeleteBlobs(_) => { + if let Some(blobs) = blobs_to_delete.pop() { + rollback_blob_ops + .push(StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs))); + } + } + _ => (), + } + } + blobs_db.do_atomically(self.convert_to_kv_batch(rollback_blob_ops)?)?; + return Err(e); + } + for op in hot_db_cache_ops { match op { StoreOp::PutBlock(block_root, block) => {