add blob info
This commit is contained in:
parent
3c9e1abcb7
commit
92cae14409
@ -25,6 +25,8 @@ pub enum Error {
|
|||||||
SchemaMigrationError(String),
|
SchemaMigrationError(String),
|
||||||
/// The store's `anchor_info` was mutated concurrently, the latest modification wasn't applied.
|
/// The store's `anchor_info` was mutated concurrently, the latest modification wasn't applied.
|
||||||
AnchorInfoConcurrentMutation,
|
AnchorInfoConcurrentMutation,
|
||||||
|
/// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied.
|
||||||
|
BlobInfoConcurrentMutation,
|
||||||
/// The block or state is unavailable due to weak subjectivity sync.
|
/// The block or state is unavailable due to weak subjectivity sync.
|
||||||
HistoryUnavailable,
|
HistoryUnavailable,
|
||||||
/// State reconstruction cannot commence because not all historic blocks are known.
|
/// State reconstruction cannot commence because not all historic blocks are known.
|
||||||
|
@ -12,9 +12,9 @@ use crate::leveldb_store::BytesKey;
|
|||||||
use crate::leveldb_store::LevelDB;
|
use crate::leveldb_store::LevelDB;
|
||||||
use crate::memory_store::MemoryStore;
|
use crate::memory_store::MemoryStore;
|
||||||
use crate::metadata::{
|
use crate::metadata::{
|
||||||
AnchorInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY,
|
AnchorInfo, BlobInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY,
|
||||||
COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY,
|
BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION,
|
||||||
SCHEMA_VERSION_KEY, SPLIT_KEY,
|
PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY,
|
||||||
};
|
};
|
||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
use crate::{
|
use crate::{
|
||||||
@ -53,6 +53,8 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
|||||||
pub(crate) split: RwLock<Split>,
|
pub(crate) split: RwLock<Split>,
|
||||||
/// The starting slots for the range of blocks & states stored in the database.
|
/// The starting slots for the range of blocks & states stored in the database.
|
||||||
anchor_info: RwLock<Option<AnchorInfo>>,
|
anchor_info: RwLock<Option<AnchorInfo>>,
|
||||||
|
/// The starting slots for the range of blobs stored in the database.
|
||||||
|
blob_info: RwLock<Option<BlobInfo>>,
|
||||||
pub(crate) config: StoreConfig,
|
pub(crate) config: StoreConfig,
|
||||||
/// Cold database containing compact historical data.
|
/// Cold database containing compact historical data.
|
||||||
pub cold_db: Cold,
|
pub cold_db: Cold,
|
||||||
@ -1293,6 +1295,65 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
.map(|a| a.anchor_slot)
|
.map(|a| a.anchor_slot)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a clone of the store's blob info.
|
||||||
|
///
|
||||||
|
/// To do mutations, use `compare_and_set_blob_info`.
|
||||||
|
pub fn get_blob_info(&self) -> Option<BlobInfo> {
|
||||||
|
self.blob_info.read_recursive().clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Atomically update the blob info from `prev_value` to `new_value`.
|
||||||
|
///
|
||||||
|
/// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other
|
||||||
|
/// values.
|
||||||
|
///
|
||||||
|
/// Return an `BlobInfoConcurrentMutation` error if the `prev_value` provided
|
||||||
|
/// is not correct.
|
||||||
|
pub fn compare_and_set_blob_info(
|
||||||
|
&self,
|
||||||
|
prev_value: Option<BlobInfo>,
|
||||||
|
new_value: Option<BlobInfo>,
|
||||||
|
) -> Result<KeyValueStoreOp, Error> {
|
||||||
|
let mut blob_info = self.blob_info.write();
|
||||||
|
if *blob_info == prev_value {
|
||||||
|
let kv_op = self.store_blob_info_in_batch(&new_value);
|
||||||
|
*blob_info = new_value;
|
||||||
|
Ok(kv_op)
|
||||||
|
} else {
|
||||||
|
Err(Error::AnchorInfoConcurrentMutation)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// As for `compare_and_set_blob_info`, but also writes the blob info to disk immediately.
|
||||||
|
pub fn compare_and_set_blob_info_with_write(
|
||||||
|
&self,
|
||||||
|
prev_value: Option<BlobInfo>,
|
||||||
|
new_value: Option<BlobInfo>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let kv_store_op = self.compare_and_set_blob_info(prev_value, new_value)?;
|
||||||
|
self.hot_db.do_atomically(vec![kv_store_op])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load the blob info from disk, but do not set `self.blob_info`.
|
||||||
|
fn load_blob_info(&self) -> Result<Option<BlobInfo>, Error> {
|
||||||
|
self.hot_db.get(&BLOB_INFO_KEY)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Store the given `blob_info` to disk.
|
||||||
|
///
|
||||||
|
/// The argument is intended to be `self.blob_info`, but is passed manually to avoid issues
|
||||||
|
/// with recursive locking.
|
||||||
|
fn store_blob_info_in_batch(&self, blob_info: &Option<BlobInfo>) -> KeyValueStoreOp {
|
||||||
|
if let Some(ref blob_info) = blob_info {
|
||||||
|
blob_info.as_kv_store_op(BLOB_INFO_KEY)
|
||||||
|
} else {
|
||||||
|
KeyValueStoreOp::DeleteKey(get_key_for_col(
|
||||||
|
DBColumn::BeaconMeta.into(),
|
||||||
|
BLOB_INFO_KEY.as_bytes(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Return the slot-window describing the available historic states.
|
/// Return the slot-window describing the available historic states.
|
||||||
///
|
///
|
||||||
/// Returns `(lower_limit, upper_limit)`.
|
/// Returns `(lower_limit, upper_limit)`.
|
||||||
|
@ -15,6 +15,7 @@ pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2);
|
|||||||
pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
|
pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
|
||||||
pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
|
pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
|
||||||
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
|
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
|
||||||
|
pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6);
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
pub struct SchemaVersion(pub u64);
|
pub struct SchemaVersion(pub u64);
|
||||||
@ -117,3 +118,28 @@ impl StoreItem for AnchorInfo {
|
|||||||
Ok(Self::from_ssz_bytes(bytes)?)
|
Ok(Self::from_ssz_bytes(bytes)?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Database parameters relevant to blob sync.
|
||||||
|
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)]
|
||||||
|
pub struct BlobInfo {
|
||||||
|
/// The block root of the next blob that needs to be added to fill in the history.
|
||||||
|
pub oldest_blob_parent: Hash256,
|
||||||
|
/// The slot before which blobs are available.
|
||||||
|
pub oldest_blob_slot: Slot,
|
||||||
|
/// The slot from which blobs are available.
|
||||||
|
pub latest_blob_slot: Slot,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StoreItem for AnchorInfo {
|
||||||
|
fn db_column() -> DBColumn {
|
||||||
|
DBColumn::BeaconMeta
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_store_bytes(&self) -> Vec<u8> {
|
||||||
|
self.as_ssz_bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
|
||||||
|
Ok(Self::from_ssz_bytes(bytes)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user