Complete making blocks and blobs db atomic

This commit is contained in:
Emilia Hane 2023-02-01 17:38:48 +01:00
parent 89cccfc397
commit 72cd68c0a4
No known key found for this signature in database
GPG Key ID: E73394F9C09206FA

View File

@ -860,30 +860,67 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self,
batch: Vec<StoreOp<E>>,
) -> Result<(), Error> {
let mut hot_db_cache_ops = Vec::new();
let (blobs_ops, hot_db_ops) = batch.into_iter().partition(|store_op| match store_op {
StoreOp::PutBlobs(_, _) | StoreOp::DeleteBlobs(_) => true,
StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => {
hot_db_cache_ops.push(store_op.clone());
false
}
_ => false,
});
let mut blobs_to_delete = Vec::new();
let (blobs_ops, hot_db_ops): (Vec<StoreOp<E>>, Vec<StoreOp<E>>) =
batch.into_iter().partition(|store_op| match store_op {
StoreOp::PutBlobs(_, _) => true,
StoreOp::DeleteBlobs(block_root) => {
match self.get_blobs(block_root) {
Ok(Some(blobs_sidecar)) => {
blobs_to_delete.push(blobs_sidecar);
}
Err(e) => {
error!(
self.log, "Error getting blobs";
"block_root" => %block_root,
"error" => ?e
);
}
_ => (),
}
true
}
StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => false,
_ => false,
});
// Update database whilst holding a lock on cache, to ensure that the cache updates
// atomically with the database.
let mut guard = self.block_cache.lock();
let mut guard_blob = self.blob_cache.lock();
self.hot_db
.do_atomically(self.convert_to_kv_batch(hot_db_ops)?)?;
let blob_cache_ops = blobs_ops.clone();
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
// todo(emhane): do we want to restore the hot db writes if this fails?
// Try to execute blobs store ops.
blobs_db.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?;
let hot_db_cache_ops = hot_db_ops.clone();
// Try to execute hot db store ops.
let tx_res = match self.convert_to_kv_batch(hot_db_ops) {
Ok(kv_store_ops) => self.hot_db.do_atomically(kv_store_ops),
Err(e) => Err(e),
};
// Rollback on failure
if let Err(e) = tx_res {
let mut rollback_blob_ops: Vec<StoreOp<E>> = Vec::with_capacity(blob_cache_ops.len());
for blob_op in blob_cache_ops {
match blob_op {
StoreOp::PutBlobs(block_root, _) => {
rollback_blob_ops.push(StoreOp::DeleteBlobs(block_root));
}
StoreOp::DeleteBlobs(_) => {
if let Some(blobs) = blobs_to_delete.pop() {
rollback_blob_ops
.push(StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs)));
}
}
_ => (),
}
}
blobs_db.do_atomically(self.convert_to_kv_batch(rollback_blob_ops)?)?;
return Err(e);
}
for op in hot_db_cache_ops {
match op {
StoreOp::PutBlock(block_root, block) => {