Store blobs in correct db for atomic ops

This commit is contained in:
Emilia Hane 2023-01-17 20:22:36 +01:00
parent 625980e484
commit dcb5495745
No known key found for this signature in database
GPG Key ID: E73394F9C09206FA
3 changed files with 35 additions and 19 deletions

View File

@ -1380,7 +1380,9 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
StoreOp::PutStateTemporaryFlag(state_root), StoreOp::PutStateTemporaryFlag(state_root),
] ]
}; };
chain.store.do_atomically(state_batch)?; chain
.store
.do_atomically_and_update_cache(state_batch, None)?;
drop(txn_lock); drop(txn_lock);
confirmed_state_roots.push(state_root); confirmed_state_roots.push(state_root);

View File

@ -31,7 +31,7 @@ where
"Garbage collecting {} temporary states", "Garbage collecting {} temporary states",
delete_ops.len() / 2 delete_ops.len() / 2
); );
self.do_atomically(delete_ops)?; self.do_atomically_and_update_cache(delete_ops, None)?;
} }
Ok(()) Ok(())

View File

@ -849,11 +849,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(key_value_batch) Ok(key_value_batch)
} }
pub fn do_atomically(&self, batch: Vec<StoreOp<E>>) -> Result<(), Error> { pub fn do_atomically_and_update_cache(
&self,
batch: Vec<StoreOp<E>>,
blobs_batch: Option<Vec<StoreOp<E>>>,
) -> Result<(), Error> {
// Update the block cache whilst holding a lock, to ensure that the cache updates atomically // Update the block cache whilst holding a lock, to ensure that the cache updates atomically
// with the database. // with the database.
let mut guard = self.block_cache.lock(); let mut guard = self.block_cache.lock();
let mut guard_blob = self.blob_cache.lock();
for op in &batch { for op in &batch {
match op { match op {
@ -861,9 +864,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
guard.put(*block_root, (**block).clone()); guard.put(*block_root, (**block).clone());
} }
StoreOp::PutBlobs(block_root, blobs) => { StoreOp::PutBlobs(_, _) => (),
guard_blob.put(*block_root, (**blobs).clone());
}
StoreOp::PutState(_, _) => (), StoreOp::PutState(_, _) => (),
@ -877,9 +878,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
guard.pop(block_root); guard.pop(block_root);
} }
StoreOp::DeleteBlobs(block_root) => { StoreOp::DeleteBlobs(_) => (),
guard_blob.pop(block_root);
}
StoreOp::DeleteState(_, _) => (), StoreOp::DeleteState(_, _) => (),
@ -891,11 +890,30 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} }
} }
if let Some(blob_ops) = blobs_batch {
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
let mut guard_blob = self.blob_cache.lock();
for op in &blob_ops {
match op {
StoreOp::PutBlobs(block_root, blobs) => {
guard_blob.put(*block_root, (**blobs).clone());
}
StoreOp::DeleteBlobs(block_root) => {
guard_blob.pop(block_root);
}
_ => (),
}
}
blobs_db.do_atomically(self.convert_to_kv_batch(blob_ops)?)?;
drop(guard_blob);
}
self.hot_db self.hot_db
.do_atomically(self.convert_to_kv_batch(batch)?)?; .do_atomically(self.convert_to_kv_batch(batch)?)?;
drop(guard); drop(guard);
drop(guard_blob);
Ok(()) Ok(())
} }
@ -1232,11 +1250,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Fetch a blobs sidecar from the store. /// Fetch a blobs sidecar from the store.
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> { pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
let blobs_db = if let Some(ref blobs_db) = self.blobs_db { let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
blobs_db
} else {
&self.cold_db
};
match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {
Some(ref blobs_bytes) => { Some(ref blobs_bytes) => {
@ -1751,7 +1765,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} }
} }
let payloads_pruned = ops.len(); let payloads_pruned = ops.len();
self.do_atomically(ops)?; self.do_atomically_and_update_cache(ops, None)?;
info!( info!(
self.log, self.log,
"Execution payload pruning complete"; "Execution payload pruning complete";
@ -2050,7 +2064,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
} }
// Delete the states from the hot database if we got this far. // Delete the states from the hot database if we got this far.
store.do_atomically(hot_db_ops)?; store.do_atomically_and_update_cache(hot_db_ops, None)?;
debug!( debug!(
store.log, store.log,