Merge pull request #2 from realbigsean/sean-debug-ci

Debug CI Sean's PR feedback
This commit is contained in:
Emilia Hane 2023-02-14 12:24:15 +01:00 committed by GitHub
commit 8200d37045
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 466 additions and 159 deletions

View File

@ -958,7 +958,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: &Hash256, block_root: &Hash256,
) -> Result<Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>, Error> { ) -> Result<Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>, Error> {
// If there is no data availability boundary, the Eip4844 fork is disabled. // If there is no data availability boundary, the Eip4844 fork is disabled.
if self.finalized_data_availability_boundary().is_some() { if let Some(finalized_data_availability_boundary) =
self.finalized_data_availability_boundary()
{
// Only use the attester cache if we can find both the block and blob // Only use the attester cache if we can find both the block and blob
if let (Some(block), Some(blobs)) = ( if let (Some(block), Some(blobs)) = (
self.early_attester_cache.get_block(*block_root), self.early_attester_cache.get_block(*block_root),
@ -970,7 +972,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})) }))
// Attempt to get the block and blobs from the database // Attempt to get the block and blobs from the database
} else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) { } else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) {
let blobs = self.get_blobs(block_root)?.map(Arc::new); let blobs = self
.get_blobs(block_root, finalized_data_availability_boundary)?
.map(Arc::new);
Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar { Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar {
beacon_block: block, beacon_block: block,
blobs_sidecar: blobs, blobs_sidecar: blobs,
@ -1066,27 +1070,32 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn get_blobs( pub fn get_blobs(
&self, &self,
block_root: &Hash256, block_root: &Hash256,
data_availability_boundary: Epoch,
) -> Result<Option<BlobsSidecar<T::EthSpec>>, Error> { ) -> Result<Option<BlobsSidecar<T::EthSpec>>, Error> {
match self.store.get_blobs(block_root)? { match self.store.get_blobs(block_root)? {
Some(blobs) => Ok(Some(blobs)), Some(blobs) => Ok(Some(blobs)),
None => { None => {
if let Ok(Some(block)) = self.get_blinded_block(block_root) { // Check for the corresponding block to understand whether we *should* have blobs.
let expected_kzg_commitments = block.message().body().blob_kzg_commitments()?; self.get_blinded_block(block_root)?
.map(|block| {
if !expected_kzg_commitments.is_empty() { // If there are no KZG commitments in the block, we know the sidecar should
Err(Error::DBInconsistent(format!( // be empty.
"Expected kzg commitments but no blobs stored for block root {}", let expected_kzg_commitments =
block_root match block.message().body().blob_kzg_commitments() {
))) Ok(kzg_commitments) => kzg_commitments,
Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock),
};
if expected_kzg_commitments.is_empty() {
Ok(BlobsSidecar::empty_from_parts(*block_root, block.slot()))
} else if data_availability_boundary <= block.epoch() {
// We should have blobs for all blocks younger than the boundary.
Err(Error::BlobsUnavailable)
} else { } else {
Ok(Some(BlobsSidecar::empty_from_parts( // We shouldn't have blobs for blocks older than the boundary.
*block_root, Err(Error::BlobsOlderThanDataAvailabilityBoundary(block.epoch()))
block.slot(),
)))
}
} else {
Ok(None)
} }
})
.transpose()
} }
} }
} }
@ -3033,10 +3042,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} }
} }
} }
let txn_lock = self.store.hot_db.begin_rw_transaction(); let txn_lock = self.store.hot_db.begin_rw_transaction();
if let Err(e) = self.store.do_atomically(ops) { if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
error!( error!(
self.log, self.log,
"Database write failed!"; "Database write failed!";
@ -4641,7 +4649,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
debug!( debug!(
self.log, self.log,
"Produced block on state"; "Produced block on state";
"block_size" => %block_size, "block_size" => block_size,
); );
metrics::observe(&metrics::BLOCK_SIZE, block_size as f64); metrics::observe(&metrics::BLOCK_SIZE, block_size as f64);

View File

@ -1380,7 +1380,9 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
StoreOp::PutStateTemporaryFlag(state_root), StoreOp::PutStateTemporaryFlag(state_root),
] ]
}; };
chain.store.do_atomically(state_batch)?; chain
.store
.do_atomically_with_block_and_blobs_cache(state_batch)?;
drop(txn_lock); drop(txn_lock);
confirmed_state_roots.push(state_root); confirmed_state_roots.push(state_root);

View File

@ -38,7 +38,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
}; };
let store_ops = cache.import_new_pubkeys(state)?; let store_ops = cache.import_new_pubkeys(state)?;
store.do_atomically(store_ops)?; store.do_atomically_with_block_and_blobs_cache(store_ops)?;
Ok(cache) Ok(cache)
} }
@ -299,7 +299,7 @@ mod test {
let ops = cache let ops = cache
.import_new_pubkeys(&state) .import_new_pubkeys(&state)
.expect("should import pubkeys"); .expect("should import pubkeys");
store.do_atomically(ops).unwrap(); store.do_atomically_with_block_and_blobs_cache(ops).unwrap();
check_cache_get(&cache, &keypairs[..]); check_cache_get(&cache, &keypairs[..]);
drop(cache); drop(cache);

View File

@ -32,7 +32,15 @@ fn get_store(db_path: &TempDir) -> Arc<HotColdDB> {
let cold_path = db_path.path().join("cold_db"); let cold_path = db_path.path().join("cold_db");
let config = StoreConfig::default(); let config = StoreConfig::default();
let log = NullLoggerBuilder.build().expect("logger should build"); let log = NullLoggerBuilder.build().expect("logger should build");
HotColdDB::open(&hot_path, &cold_path, |_, _, _| Ok(()), config, spec, log) HotColdDB::open(
&hot_path,
&cold_path,
None,
|_, _, _| Ok(()),
config,
spec,
log,
)
.expect("disk store should initialize") .expect("disk store should initialize")
} }

View File

@ -60,7 +60,15 @@ fn get_store_with_spec(
let config = StoreConfig::default(); let config = StoreConfig::default();
let log = test_logger(); let log = test_logger();
HotColdDB::open(&hot_path, &cold_path, |_, _, _| Ok(()), config, spec, log) HotColdDB::open(
&hot_path,
&cold_path,
None,
|_, _, _| Ok(()),
config,
spec,
log,
)
.expect("disk store should initialize") .expect("disk store should initialize")
} }

View File

@ -68,6 +68,7 @@ pub struct ClientBuilder<T: BeaconChainTypes> {
gossipsub_registry: Option<Registry>, gossipsub_registry: Option<Registry>,
db_path: Option<PathBuf>, db_path: Option<PathBuf>,
freezer_db_path: Option<PathBuf>, freezer_db_path: Option<PathBuf>,
blobs_db_path: Option<PathBuf>,
http_api_config: http_api::Config, http_api_config: http_api::Config,
http_metrics_config: http_metrics::Config, http_metrics_config: http_metrics::Config,
slasher: Option<Arc<Slasher<T::EthSpec>>>, slasher: Option<Arc<Slasher<T::EthSpec>>>,
@ -100,6 +101,7 @@ where
gossipsub_registry: None, gossipsub_registry: None,
db_path: None, db_path: None,
freezer_db_path: None, freezer_db_path: None,
blobs_db_path: None,
http_api_config: <_>::default(), http_api_config: <_>::default(),
http_metrics_config: <_>::default(), http_metrics_config: <_>::default(),
slasher: None, slasher: None,
@ -892,6 +894,7 @@ where
mut self, mut self,
hot_path: &Path, hot_path: &Path,
cold_path: &Path, cold_path: &Path,
blobs_path: Option<PathBuf>,
config: StoreConfig, config: StoreConfig,
log: Logger, log: Logger,
) -> Result<Self, String> { ) -> Result<Self, String> {
@ -907,6 +910,7 @@ where
self.db_path = Some(hot_path.into()); self.db_path = Some(hot_path.into());
self.freezer_db_path = Some(cold_path.into()); self.freezer_db_path = Some(cold_path.into());
self.blobs_db_path = blobs_path.clone();
let inner_spec = spec.clone(); let inner_spec = spec.clone();
let deposit_contract_deploy_block = context let deposit_contract_deploy_block = context
@ -929,6 +933,7 @@ where
let store = HotColdDB::open( let store = HotColdDB::open(
hot_path, hot_path,
cold_path, cold_path,
blobs_path,
schema_upgrade, schema_upgrade,
config, config,
spec, spec,

View File

@ -49,6 +49,13 @@ pub struct Config {
pub db_name: String, pub db_name: String,
/// Path where the freezer database will be located. /// Path where the freezer database will be located.
pub freezer_db_path: Option<PathBuf>, pub freezer_db_path: Option<PathBuf>,
/// Path where the blobs database will be located if blobs should be in a separate database.
///
/// The capacity this location should hold varies with the data availability boundary. It
/// should be able to store < 69 GB when [MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS](types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS) is 4096
/// epochs of 32 slots (up to 131072 bytes data per blob and up to 4 blobs per block, 88 bytes
/// of [BlobsSidecar](types::BlobsSidecar) metadata per block).
pub blobs_db_path: Option<PathBuf>,
pub log_file: PathBuf, pub log_file: PathBuf,
/// If true, the node will use co-ordinated junk for eth1 values. /// If true, the node will use co-ordinated junk for eth1 values.
/// ///
@ -89,6 +96,7 @@ impl Default for Config {
data_dir: PathBuf::from(DEFAULT_ROOT_DIR), data_dir: PathBuf::from(DEFAULT_ROOT_DIR),
db_name: "chain_db".to_string(), db_name: "chain_db".to_string(),
freezer_db_path: None, freezer_db_path: None,
blobs_db_path: None,
log_file: PathBuf::from(""), log_file: PathBuf::from(""),
genesis: <_>::default(), genesis: <_>::default(),
store: <_>::default(), store: <_>::default(),
@ -149,11 +157,27 @@ impl Config {
.unwrap_or_else(|| self.default_freezer_db_path()) .unwrap_or_else(|| self.default_freezer_db_path())
} }
/// Returns the path to which the client may initialize the on-disk blobs database.
///
/// Will attempt to use the user-supplied path from e.g. the CLI, or will default
/// to None.
pub fn get_blobs_db_path(&self) -> Option<PathBuf> {
self.blobs_db_path.clone()
}
/// Get the freezer DB path, creating it if necessary. /// Get the freezer DB path, creating it if necessary.
pub fn create_freezer_db_path(&self) -> Result<PathBuf, String> { pub fn create_freezer_db_path(&self) -> Result<PathBuf, String> {
ensure_dir_exists(self.get_freezer_db_path()) ensure_dir_exists(self.get_freezer_db_path())
} }
/// Get the blobs DB path, creating it if necessary.
pub fn create_blobs_db_path(&self) -> Result<Option<PathBuf>, String> {
match self.get_blobs_db_path() {
Some(blobs_db_path) => Ok(Some(ensure_dir_exists(blobs_db_path)?)),
None => Ok(None),
}
}
/// Returns the "modern" path to the data_dir. /// Returns the "modern" path to the data_dir.
/// ///
/// See `Self::get_data_dir` documentation for more info. /// See `Self::get_data_dir` documentation for more info.

View File

@ -14,7 +14,9 @@ pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc};
use engines::{Engine, EngineError}; use engines::{Engine, EngineError};
pub use engines::{EngineState, ForkchoiceState}; pub use engines::{EngineState, ForkchoiceState};
use eth2::types::{builder_bid::SignedBuilderBid, ForkVersionedResponse}; use eth2::types::{builder_bid::SignedBuilderBid, ForkVersionedResponse};
use ethers_core::types::Transaction as EthersTransaction; use ethers_core::abi::ethereum_types::FromStrRadixErr;
use ethers_core::types::transaction::eip2930::AccessListItem;
use ethers_core::types::{Transaction as EthersTransaction, U64};
use fork_choice::ForkchoiceUpdateParameters; use fork_choice::ForkchoiceUpdateParameters;
use lru::LruCache; use lru::LruCache;
use payload_status::process_payload_status; use payload_status::process_payload_status;
@ -39,12 +41,12 @@ use tokio::{
}; };
use tokio_stream::wrappers::WatchStream; use tokio_stream::wrappers::WatchStream;
use types::consts::eip4844::BLOB_TX_TYPE; use types::consts::eip4844::BLOB_TX_TYPE;
use types::transaction::{AccessTuple, BlobTransaction}; use types::transaction::{AccessTuple, BlobTransaction, EcdsaSignature, SignedBlobTransaction};
use types::{ use types::{
blobs_sidecar::{Blobs, KzgCommitments}, blobs_sidecar::{Blobs, KzgCommitments},
ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge,
}; };
use types::{AbstractExecPayload, BeaconStateError, ExecPayload}; use types::{AbstractExecPayload, BeaconStateError, ExecPayload, VersionedHash};
use types::{ use types::{
BlindedPayload, BlockType, ChainSpec, Epoch, ExecutionBlockHash, ForkName, BlindedPayload, BlockType, ChainSpec, Epoch, ExecutionBlockHash, ForkName,
ProposerPreparationData, PublicKeyBytes, Signature, SignedBeaconBlock, Slot, Transaction, ProposerPreparationData, PublicKeyBytes, Signature, SignedBeaconBlock, Slot, Transaction,
@ -137,16 +139,6 @@ pub enum BlockProposalContents<T: EthSpec, Payload: AbstractExecPayload<T>> {
}, },
} }
pub struct BlockProposalBlobsContents<T: EthSpec> {
pub kzg_commitments: KzgCommitments<T>,
pub blobs: Blobs<T>,
}
pub struct BlockProposalContentsDeconstructed<T: EthSpec, Payload: AbstractExecPayload<T>> {
pub payload: Payload,
pub blobs_content: Option<BlockProposalBlobsContents<T>>,
}
impl<T: EthSpec, Payload: AbstractExecPayload<T>> BlockProposalContents<T, Payload> { impl<T: EthSpec, Payload: AbstractExecPayload<T>> BlockProposalContents<T, Payload> {
pub fn deconstruct(self) -> (Payload, Option<KzgCommitments<T>>, Option<Blobs<T>>) { pub fn deconstruct(self) -> (Payload, Option<KzgCommitments<T>>, Option<Blobs<T>>) {
match self { match self {
@ -1692,13 +1684,15 @@ impl<T: EthSpec> ExecutionLayer<T> {
return Ok(None); return Ok(None);
}; };
let transactions = VariableList::from( let convert_transactions = |transactions: Vec<EthersTransaction>| {
block VariableList::new(
.transactions() transactions
.iter() .into_iter()
.map(ethers_tx_to_bytes::<T>) .map(ethers_tx_to_bytes::<T>)
.collect::<Result<Vec<_>, BlobTxConversionError>>()?, .collect::<Result<Vec<_>, BlobTxConversionError>>()?,
); )
.map_err(BlobTxConversionError::SszError)
};
let payload = match block { let payload = match block {
ExecutionBlockWithTransactions::Merge(merge_block) => { ExecutionBlockWithTransactions::Merge(merge_block) => {
@ -1716,7 +1710,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
extra_data: merge_block.extra_data, extra_data: merge_block.extra_data,
base_fee_per_gas: merge_block.base_fee_per_gas, base_fee_per_gas: merge_block.base_fee_per_gas,
block_hash: merge_block.block_hash, block_hash: merge_block.block_hash,
transactions, transactions: convert_transactions(merge_block.transactions)?,
}) })
} }
ExecutionBlockWithTransactions::Capella(capella_block) => { ExecutionBlockWithTransactions::Capella(capella_block) => {
@ -1742,7 +1736,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
extra_data: capella_block.extra_data, extra_data: capella_block.extra_data,
base_fee_per_gas: capella_block.base_fee_per_gas, base_fee_per_gas: capella_block.base_fee_per_gas,
block_hash: capella_block.block_hash, block_hash: capella_block.block_hash,
transactions, transactions: convert_transactions(capella_block.transactions)?,
withdrawals, withdrawals,
}) })
} }
@ -1770,7 +1764,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
base_fee_per_gas: eip4844_block.base_fee_per_gas, base_fee_per_gas: eip4844_block.base_fee_per_gas,
excess_data_gas: eip4844_block.excess_data_gas, excess_data_gas: eip4844_block.excess_data_gas,
block_hash: eip4844_block.block_hash, block_hash: eip4844_block.block_hash,
transactions, transactions: convert_transactions(eip4844_block.transactions)?,
withdrawals, withdrawals,
}) })
} }
@ -2035,10 +2029,18 @@ pub enum BlobTxConversionError {
MaxFeePerDataGasMissing, MaxFeePerDataGasMissing,
/// Missing the `blob_versioned_hashes` field. /// Missing the `blob_versioned_hashes` field.
BlobVersionedHashesMissing, BlobVersionedHashesMissing,
/// `y_parity` field was greater than one.
InvalidYParity,
/// There was an error converting the transaction to SSZ. /// There was an error converting the transaction to SSZ.
SszError(ssz_types::Error), SszError(ssz_types::Error),
/// There was an error converting the transaction from JSON. /// There was an error converting the transaction from JSON.
SerdeJson(serde_json::Error), SerdeJson(serde_json::Error),
/// There was an error converting the transaction from hex.
FromHex(String),
/// There was an error converting the transaction from hex.
FromStrRadix(FromStrRadixErr),
/// A `versioned_hash` did not contain 32 bytes.
InvalidVersionedHashBytesLen,
} }
impl From<ssz_types::Error> for BlobTxConversionError { impl From<ssz_types::Error> for BlobTxConversionError {
@ -2057,67 +2059,123 @@ impl From<serde_json::Error> for BlobTxConversionError {
/// on transaction type. That means RLP encoding if this is a transaction other than a /// on transaction type. That means RLP encoding if this is a transaction other than a
/// `BLOB_TX_TYPE` transaction in which case, SSZ encoding will be used. /// `BLOB_TX_TYPE` transaction in which case, SSZ encoding will be used.
fn ethers_tx_to_bytes<T: EthSpec>( fn ethers_tx_to_bytes<T: EthSpec>(
transaction: &EthersTransaction, transaction: EthersTransaction,
) -> Result<Transaction<T::MaxBytesPerTransaction>, BlobTxConversionError> { ) -> Result<Transaction<T::MaxBytesPerTransaction>, BlobTxConversionError> {
let tx_type = transaction let tx_type = transaction
.transaction_type .transaction_type
.ok_or(BlobTxConversionError::NoTransactionType)? .ok_or(BlobTxConversionError::NoTransactionType)?
.as_u64(); .as_u64();
let tx = if BLOB_TX_TYPE as u64 == tx_type { let tx = if BLOB_TX_TYPE as u64 == tx_type {
let chain_id = transaction let EthersTransaction {
.chain_id hash: _,
.ok_or(BlobTxConversionError::NoChainId)?; nonce,
let nonce = if transaction.nonce > Uint256::from(u64::MAX) { block_hash: _,
block_number: _,
transaction_index: _,
from: _,
to,
value,
gas_price: _,
gas,
input,
v,
r,
s,
transaction_type: _,
access_list,
max_priority_fee_per_gas,
max_fee_per_gas,
chain_id,
other,
} = transaction;
// ******************** BlobTransaction fields ********************
// chainId
let chain_id = chain_id.ok_or(BlobTxConversionError::NoChainId)?;
// nonce
let nonce = if nonce > Uint256::from(u64::MAX) {
return Err(BlobTxConversionError::NonceTooLarge); return Err(BlobTxConversionError::NonceTooLarge);
} else { } else {
transaction.nonce.as_u64() nonce.as_u64()
}; };
let max_priority_fee_per_gas = transaction
.max_priority_fee_per_gas // maxPriorityFeePerGas
.ok_or(BlobTxConversionError::MaxPriorityFeePerGasMissing)?; let max_priority_fee_per_gas =
let max_fee_per_gas = transaction max_priority_fee_per_gas.ok_or(BlobTxConversionError::MaxPriorityFeePerGasMissing)?;
.max_fee_per_gas
.ok_or(BlobTxConversionError::MaxFeePerGasMissing)?; // maxFeePerGas
let gas = if transaction.gas > Uint256::from(u64::MAX) { let max_fee_per_gas = max_fee_per_gas.ok_or(BlobTxConversionError::MaxFeePerGasMissing)?;
// gas
let gas = if gas > Uint256::from(u64::MAX) {
return Err(BlobTxConversionError::GasTooHigh); return Err(BlobTxConversionError::GasTooHigh);
} else { } else {
transaction.gas.as_u64() gas.as_u64()
}; };
let to = transaction.to;
let value = transaction.value; // data (a.k.a input)
let data = VariableList::new(transaction.input.to_vec())?; let data = VariableList::new(input.to_vec())?;
// accessList
let access_list = VariableList::new( let access_list = VariableList::new(
transaction access_list
.access_list
.as_ref()
.ok_or(BlobTxConversionError::AccessListMissing)? .ok_or(BlobTxConversionError::AccessListMissing)?
.0 .0
.iter() .into_iter()
.map(|access_tuple| { .map(|access_tuple| {
let AccessListItem {
address,
storage_keys,
} = access_tuple;
Ok(AccessTuple { Ok(AccessTuple {
address: access_tuple.address, address,
storage_keys: VariableList::new(access_tuple.storage_keys.clone())?, storage_keys: VariableList::new(storage_keys)?,
}) })
}) })
.collect::<Result<Vec<AccessTuple>, BlobTxConversionError>>()?, .collect::<Result<Vec<AccessTuple>, BlobTxConversionError>>()?,
)?; )?;
let max_fee_per_data_gas = transaction
.other // ******************** BlobTransaction `other` fields ********************
//
// Here we use the `other` field in the `ethers-rs` `Transaction` type because
// `ethers-rs` does not yet support SSZ and therefore the blobs transaction type.
// maxFeePerDataGas
let max_fee_per_data_gas = Uint256::from_str_radix(
other
.get("maxFeePerDataGas") .get("maxFeePerDataGas")
.ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)? .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)?
.as_str() .as_str()
.ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)? .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)?,
.parse() 16,
.map_err(|_| BlobTxConversionError::MaxFeePerDataGasMissing)?; )
let blob_versioned_hashes = serde_json::from_str( .map_err(BlobTxConversionError::FromStrRadix)?;
transaction
.other // blobVersionedHashes
let blob_versioned_hashes = other
.get("blobVersionedHashes") .get("blobVersionedHashes")
.ok_or(BlobTxConversionError::BlobVersionedHashesMissing)? .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)?
.as_array()
.ok_or(BlobTxConversionError::BlobVersionedHashesMissing)?
.iter()
.map(|versioned_hash| {
let hash_bytes = eth2_serde_utils::hex::decode(
versioned_hash
.as_str() .as_str()
.ok_or(BlobTxConversionError::BlobVersionedHashesMissing)?, .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)?,
)?; )
BlobTransaction { .map_err(BlobTxConversionError::FromHex)?;
if hash_bytes.len() != Hash256::ssz_fixed_len() {
Err(BlobTxConversionError::InvalidVersionedHashBytesLen)
} else {
Ok(Hash256::from_slice(&hash_bytes))
}
})
.collect::<Result<Vec<VersionedHash>, BlobTxConversionError>>()?;
let message = BlobTransaction {
chain_id, chain_id,
nonce, nonce,
max_priority_fee_per_gas, max_priority_fee_per_gas,
@ -2128,9 +2186,24 @@ fn ethers_tx_to_bytes<T: EthSpec>(
data, data,
access_list, access_list,
max_fee_per_data_gas, max_fee_per_data_gas,
blob_versioned_hashes, blob_versioned_hashes: VariableList::new(blob_versioned_hashes)?,
} };
.as_ssz_bytes()
// ******************** EcdsaSignature fields ********************
let y_parity = if v == U64::zero() {
false
} else if v == U64::one() {
true
} else {
return Err(BlobTxConversionError::InvalidYParity);
};
let signature = EcdsaSignature { y_parity, r, s };
// The `BLOB_TX_TYPE` should prepend the SSZ encoded `SignedBlobTransaction`.
let mut signed_tx = SignedBlobTransaction { message, signature }.as_ssz_bytes();
signed_tx.insert(0, BLOB_TX_TYPE);
signed_tx
} else { } else {
transaction.rlp().to_vec() transaction.rlp().to_vec()
}; };

View File

@ -218,13 +218,16 @@ impl BlockId {
chain: &BeaconChain<T>, chain: &BeaconChain<T>,
) -> Result<Arc<BlobsSidecar<T::EthSpec>>, warp::Rejection> { ) -> Result<Arc<BlobsSidecar<T::EthSpec>>, warp::Rejection> {
let root = self.root(chain)?.0; let root = self.root(chain)?.0;
match chain.store.get_blobs(&root) { let Some(data_availability_boundary) = chain.data_availability_boundary() else {
return Err(warp_utils::reject::custom_not_found("Eip4844 fork disabled".into()));
};
match chain.get_blobs(&root, data_availability_boundary) {
Ok(Some(blob)) => Ok(Arc::new(blob)), Ok(Some(blob)) => Ok(Arc::new(blob)),
Ok(None) => Err(warp_utils::reject::custom_not_found(format!( Ok(None) => Err(warp_utils::reject::custom_not_found(format!(
"Blob with block root {} is not in the store", "Blob with block root {} is not in the store",
root root
))), ))),
Err(e) => Err(warp_utils::reject::beacon_chain_error(e.into())), Err(e) => Err(warp_utils::reject::beacon_chain_error(e)),
} }
} }
} }

View File

@ -408,15 +408,12 @@ impl ProtocolId {
/// Returns `true` if the given `ProtocolId` should expect `context_bytes` in the /// Returns `true` if the given `ProtocolId` should expect `context_bytes` in the
/// beginning of the stream, else returns `false`. /// beginning of the stream, else returns `false`.
pub fn has_context_bytes(&self) -> bool { pub fn has_context_bytes(&self) -> bool {
match self.version { match self.message_name {
Version::V2 => matches!( Protocol::BlocksByRange | Protocol::BlocksByRoot => {
self.message_name, !matches!(self.version, Version::V1)
Protocol::BlocksByRange | Protocol::BlocksByRoot }
), Protocol::BlobsByRange | Protocol::BlobsByRoot | Protocol::LightClientBootstrap => true,
Version::V1 => matches!( Protocol::Goodbye | Protocol::Ping | Protocol::Status | Protocol::MetaData => false,
self.message_name,
Protocol::BlobsByRange | Protocol::BlobsByRoot
),
} }
} }
} }

View File

@ -799,7 +799,7 @@ impl<T: BeaconChainTypes> Worker<T> {
let mut send_response = true; let mut send_response = true;
for root in block_roots { for root in block_roots {
match self.chain.get_blobs(&root) { match self.chain.get_blobs(&root, data_availability_boundary) {
Ok(Some(blobs)) => { Ok(Some(blobs)) => {
blobs_sent += 1; blobs_sent += 1;
self.send_network_message(NetworkMessage::SendResponse { self.send_network_message(NetworkMessage::SendResponse {

View File

@ -560,14 +560,16 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
/// Check whether a batch for this epoch (and only this epoch) should request just blocks or /// Check whether a batch for this epoch (and only this epoch) should request just blocks or
/// blocks and blobs. /// blocks and blobs.
pub fn batch_type(&self, _epoch: types::Epoch) -> ByRangeRequestType { #[allow(unused)]
if super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH * super::range_sync::EPOCHS_PER_BATCH pub fn batch_type(&self, epoch: types::Epoch) -> ByRangeRequestType {
!= 1 // Induces a compile time panic if this doesn't hold true.
{ #[allow(clippy::assertions_on_constants)]
panic!( const _: () = assert!(
super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1
&& super::range_sync::EPOCHS_PER_BATCH == 1,
"To deal with alignment with 4844 boundaries, batches need to be of just one epoch" "To deal with alignment with 4844 boundaries, batches need to be of just one epoch"
); );
}
#[cfg(test)] #[cfg(test)]
{ {
// Keep tests only for blocks. // Keep tests only for blocks.
@ -576,7 +578,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
#[cfg(not(test))] #[cfg(not(test))]
{ {
if let Some(data_availability_boundary) = self.chain.data_availability_boundary() { if let Some(data_availability_boundary) = self.chain.data_availability_boundary() {
if _epoch >= data_availability_boundary { if epoch >= data_availability_boundary {
ByRangeRequestType::BlocksAndBlobs ByRangeRequestType::BlocksAndBlobs
} else { } else {
ByRangeRequestType::Blocks ByRangeRequestType::Blocks

View File

@ -28,6 +28,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.help("Data directory for the freezer database.") .help("Data directory for the freezer database.")
.takes_value(true) .takes_value(true)
) )
.arg(
Arg::with_name("blobs-dir")
.long("blobs-dir")
.value_name("DIR")
.help("Data directory for the blobs database.")
.takes_value(true)
)
/* /*
* Network parameters. * Network parameters.
*/ */

View File

@ -390,6 +390,10 @@ pub fn get_config<E: EthSpec>(
client_config.freezer_db_path = Some(PathBuf::from(freezer_dir)); client_config.freezer_db_path = Some(PathBuf::from(freezer_dir));
} }
if let Some(blobs_db_dir) = cli_args.value_of("blobs-dir") {
client_config.blobs_db_path = Some(PathBuf::from(blobs_db_dir));
}
let (sprp, sprp_explicit) = get_slots_per_restore_point::<E>(cli_args)?; let (sprp, sprp_explicit) = get_slots_per_restore_point::<E>(cli_args)?;
client_config.store.slots_per_restore_point = sprp; client_config.store.slots_per_restore_point = sprp;
client_config.store.slots_per_restore_point_set_explicitly = sprp_explicit; client_config.store.slots_per_restore_point_set_explicitly = sprp_explicit;

View File

@ -64,6 +64,7 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
let _datadir = client_config.create_data_dir()?; let _datadir = client_config.create_data_dir()?;
let db_path = client_config.create_db_path()?; let db_path = client_config.create_db_path()?;
let freezer_db_path = client_config.create_freezer_db_path()?; let freezer_db_path = client_config.create_freezer_db_path()?;
let blobs_db_path = client_config.create_blobs_db_path()?;
let executor = context.executor.clone(); let executor = context.executor.clone();
if let Some(legacy_dir) = client_config.get_existing_legacy_data_dir() { if let Some(legacy_dir) = client_config.get_existing_legacy_data_dir() {
@ -84,7 +85,13 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
.runtime_context(context) .runtime_context(context)
.chain_spec(spec) .chain_spec(spec)
.http_api_config(client_config.http_api.clone()) .http_api_config(client_config.http_api.clone())
.disk_store(&db_path, &freezer_db_path, store_config, log.clone())?; .disk_store(
&db_path,
&freezer_db_path,
blobs_db_path,
store_config,
log.clone(),
)?;
let builder = if let Some(slasher_config) = client_config.slasher.clone() { let builder = if let Some(slasher_config) = client_config.slasher.clone() {
let slasher = Arc::new( let slasher = Arc::new(

View File

@ -31,7 +31,7 @@ where
"Garbage collecting {} temporary states", "Garbage collecting {} temporary states",
delete_ops.len() / 2 delete_ops.len() / 2
); );
self.do_atomically(delete_ops)?; self.do_atomically_with_block_and_blobs_cache(delete_ops)?;
} }
Ok(()) Ok(())

View File

@ -35,7 +35,7 @@ use state_processing::{
use std::cmp::min; use std::cmp::min;
use std::convert::TryInto; use std::convert::TryInto;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::path::Path; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
@ -59,6 +59,8 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
pub(crate) config: StoreConfig, pub(crate) config: StoreConfig,
/// Cold database containing compact historical data. /// Cold database containing compact historical data.
pub cold_db: Cold, pub cold_db: Cold,
/// Database containing blobs. If None, store falls back to use `cold_db`.
pub blobs_db: Option<Cold>,
/// Hot database containing duplicated but quick-to-access recent data. /// Hot database containing duplicated but quick-to-access recent data.
/// ///
/// The hot database also contains all blocks. /// The hot database also contains all blocks.
@ -98,6 +100,8 @@ pub enum HotColdDBError {
MissingExecutionPayload(Hash256), MissingExecutionPayload(Hash256),
MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
MissingAnchorInfo, MissingAnchorInfo,
MissingPathToBlobsDatabase,
BlobsPreviouslyInDefaultStore,
HotStateSummaryError(BeaconStateError), HotStateSummaryError(BeaconStateError),
RestorePointDecodeError(ssz::DecodeError), RestorePointDecodeError(ssz::DecodeError),
BlockReplayBeaconError(BeaconStateError), BlockReplayBeaconError(BeaconStateError),
@ -119,6 +123,7 @@ pub enum HotColdDBError {
request_slot: Option<Slot>, request_slot: Option<Slot>,
state_root: Hash256, state_root: Hash256,
}, },
Rollback,
} }
impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> { impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
@ -134,6 +139,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
anchor_info: RwLock::new(None), anchor_info: RwLock::new(None),
blob_info: RwLock::new(BlobInfo::default()), blob_info: RwLock::new(BlobInfo::default()),
cold_db: MemoryStore::open(), cold_db: MemoryStore::open(),
blobs_db: Some(MemoryStore::open()),
hot_db: MemoryStore::open(), hot_db: MemoryStore::open(),
block_cache: Mutex::new(LruCache::new(config.block_cache_size)), block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)), blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)),
@ -157,6 +163,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
pub fn open( pub fn open(
hot_path: &Path, hot_path: &Path,
cold_path: &Path, cold_path: &Path,
blobs_db_path: Option<PathBuf>,
migrate_schema: impl FnOnce(Arc<Self>, SchemaVersion, SchemaVersion) -> Result<(), Error>, migrate_schema: impl FnOnce(Arc<Self>, SchemaVersion, SchemaVersion) -> Result<(), Error>,
config: StoreConfig, config: StoreConfig,
spec: ChainSpec, spec: ChainSpec,
@ -169,6 +176,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
anchor_info: RwLock::new(None), anchor_info: RwLock::new(None),
blob_info: RwLock::new(BlobInfo::default()), blob_info: RwLock::new(BlobInfo::default()),
cold_db: LevelDB::open(cold_path)?, cold_db: LevelDB::open(cold_path)?,
blobs_db: None,
hot_db: LevelDB::open(hot_path)?, hot_db: LevelDB::open(hot_path)?,
block_cache: Mutex::new(LruCache::new(config.block_cache_size)), block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)), blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)),
@ -213,6 +221,53 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
); );
} }
// Open separate blobs directory if configured and same configuration was used on previous
// run.
let blob_info = db.load_blob_info()?;
let new_blob_info = {
match (&blob_info, &blobs_db_path) {
(Some(blob_info), Some(_)) => {
if !blob_info.blobs_db {
return Err(HotColdDBError::BlobsPreviouslyInDefaultStore.into());
}
BlobInfo {
oldest_blob_slot: blob_info.oldest_blob_slot,
blobs_db: true,
}
}
(Some(blob_info), None) => {
if blob_info.blobs_db {
return Err(HotColdDBError::MissingPathToBlobsDatabase.into());
}
BlobInfo {
oldest_blob_slot: blob_info.oldest_blob_slot,
blobs_db: false,
}
}
(None, Some(_)) => BlobInfo {
oldest_blob_slot: None,
blobs_db: true,
}, // first time starting up node
(None, None) => BlobInfo {
oldest_blob_slot: None,
blobs_db: false,
}, // first time starting up node
}
};
if new_blob_info.blobs_db {
if let Some(path) = &blobs_db_path {
db.blobs_db = Some(LevelDB::open(path.as_path())?);
}
}
let blob_info = blob_info.unwrap_or(db.get_blob_info());
db.compare_and_set_blob_info_with_write(blob_info, new_blob_info)?;
info!(
db.log,
"Blobs DB initialized";
"use separate blobs db" => db.get_blob_info().blobs_db,
"path" => ?blobs_db_path
);
// Ensure that the schema version of the on-disk database matches the software. // Ensure that the schema version of the on-disk database matches the software.
// If the version is mismatched, an automatic migration will be attempted. // If the version is mismatched, an automatic migration will be attempted.
let db = Arc::new(db); let db = Arc::new(db);
@ -508,11 +563,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.hot_db self.hot_db
.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())?; .key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())?;
self.hot_db self.hot_db
.key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes()) .key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes())?;
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes())
} }
pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar<E>) -> Result<(), Error> { pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar<E>) -> Result<(), Error> {
self.hot_db.put_bytes( let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
blobs_db.put_bytes(
DBColumn::BeaconBlob.into(), DBColumn::BeaconBlob.into(),
block_root.as_bytes(), block_root.as_bytes(),
&blobs.as_ssz_bytes(), &blobs.as_ssz_bytes(),
@ -521,21 +579,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(()) Ok(())
} }
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
// FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks,
// may want to attempt to use one again
if let Some(bytes) = self
.hot_db
.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())?
{
let ret = BlobsSidecar::from_ssz_bytes(&bytes)?;
self.blob_cache.lock().put(*block_root, ret.clone());
Ok(Some(ret))
} else {
Ok(None)
}
}
pub fn blobs_as_kv_store_ops( pub fn blobs_as_kv_store_ops(
&self, &self,
key: &Hash256, key: &Hash256,
@ -832,22 +875,76 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(key_value_batch) Ok(key_value_batch)
} }
pub fn do_atomically(&self, batch: Vec<StoreOp<E>>) -> Result<(), Error> { pub fn do_atomically_with_block_and_blobs_cache(
// Update the block cache whilst holding a lock, to ensure that the cache updates atomically &self,
// with the database. batch: Vec<StoreOp<E>>,
) -> Result<(), Error> {
let mut blobs_to_delete = Vec::new();
let (blobs_ops, hot_db_ops): (Vec<StoreOp<E>>, Vec<StoreOp<E>>) =
batch.into_iter().partition(|store_op| match store_op {
StoreOp::PutBlobs(_, _) => true,
StoreOp::DeleteBlobs(block_root) => {
match self.get_blobs(block_root) {
Ok(Some(blobs_sidecar)) => {
blobs_to_delete.push(blobs_sidecar);
}
Err(e) => {
error!(
self.log, "Error getting blobs";
"block_root" => %block_root,
"error" => ?e
);
}
_ => (),
}
true
}
StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => false,
_ => false,
});
// Update database whilst holding a lock on cache, to ensure that the cache updates
// atomically with the database.
let mut guard = self.block_cache.lock(); let mut guard = self.block_cache.lock();
let mut guard_blob = self.blob_cache.lock(); let mut guard_blob = self.blob_cache.lock();
for op in &batch { let blob_cache_ops = blobs_ops.clone();
match op { let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
StoreOp::PutBlock(block_root, block) => { // Try to execute blobs store ops.
guard.put(*block_root, (**block).clone()); blobs_db.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?;
let hot_db_cache_ops = hot_db_ops.clone();
// Try to execute hot db store ops.
let tx_res = match self.convert_to_kv_batch(hot_db_ops) {
Ok(kv_store_ops) => self.hot_db.do_atomically(kv_store_ops),
Err(e) => Err(e),
};
// Rollback on failure
if let Err(e) = tx_res {
let mut blob_cache_ops = blob_cache_ops;
for op in blob_cache_ops.iter_mut() {
let reverse_op = match op {
StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root),
StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() {
Some(blobs) => StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs)),
None => return Err(HotColdDBError::Rollback.into()),
},
_ => return Err(HotColdDBError::Rollback.into()),
};
*op = reverse_op;
}
blobs_db.do_atomically(self.convert_to_kv_batch(blob_cache_ops)?)?;
return Err(e);
} }
StoreOp::PutBlobs(block_root, blobs) => { for op in hot_db_cache_ops {
guard_blob.put(*block_root, (**blobs).clone()); match op {
StoreOp::PutBlock(block_root, block) => {
guard.put(block_root, (*block).clone());
} }
StoreOp::PutBlobs(_, _) => (),
StoreOp::PutState(_, _) => (), StoreOp::PutState(_, _) => (),
StoreOp::PutStateSummary(_, _) => (), StoreOp::PutStateSummary(_, _) => (),
@ -857,12 +954,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::DeleteStateTemporaryFlag(_) => (), StoreOp::DeleteStateTemporaryFlag(_) => (),
StoreOp::DeleteBlock(block_root) => { StoreOp::DeleteBlock(block_root) => {
guard.pop(block_root); guard.pop(&block_root);
} }
StoreOp::DeleteBlobs(block_root) => { StoreOp::DeleteBlobs(_) => (),
guard_blob.pop(block_root);
}
StoreOp::DeleteState(_, _) => (), StoreOp::DeleteState(_, _) => (),
@ -874,8 +969,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} }
} }
self.hot_db for op in blob_cache_ops {
.do_atomically(self.convert_to_kv_batch(batch)?)?; match op {
StoreOp::PutBlobs(block_root, blobs) => {
guard_blob.put(block_root, (*blobs).clone());
}
StoreOp::DeleteBlobs(block_root) => {
guard_blob.pop(&block_root);
}
_ => (),
}
}
drop(guard); drop(guard);
drop(guard_blob); drop(guard_blob);
@ -1212,6 +1319,22 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}) })
} }
/// Fetch a blobs sidecar from the store.
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {
Some(ref blobs_bytes) => {
let blobs = BlobsSidecar::from_ssz_bytes(blobs_bytes)?;
// FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks,
// may want to attempt to use one again
self.blob_cache.lock().put(*block_root, blobs.clone());
Ok(Some(blobs))
}
None => Ok(None),
}
}
/// Get a reference to the `ChainSpec` used by the database. /// Get a reference to the `ChainSpec` used by the database.
pub fn get_chain_spec(&self) -> &ChainSpec { pub fn get_chain_spec(&self) -> &ChainSpec {
&self.spec &self.spec
@ -1713,7 +1836,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} }
} }
let payloads_pruned = ops.len(); let payloads_pruned = ops.len();
self.do_atomically(ops)?; self.do_atomically_with_block_and_blobs_cache(ops)?;
info!( info!(
self.log, self.log,
"Execution payload pruning complete"; "Execution payload pruning complete";
@ -1862,16 +1985,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} }
} }
let blobs_sidecars_pruned = ops.len(); let blobs_sidecars_pruned = ops.len();
let new_blob_info = BlobInfo {
let update_blob_info = self.compare_and_set_blob_info(
blob_info,
BlobInfo {
oldest_blob_slot: Some(end_slot + 1), oldest_blob_slot: Some(end_slot + 1),
}, blobs_db: blob_info.blobs_db,
)?; };
let update_blob_info = self.compare_and_set_blob_info(blob_info, new_blob_info)?;
ops.push(StoreOp::KeyValueOp(update_blob_info)); ops.push(StoreOp::KeyValueOp(update_blob_info));
self.do_atomically(ops)?; self.do_atomically_with_block_and_blobs_cache(ops)?;
info!( info!(
self.log, self.log,
"Blobs sidecar pruning complete"; "Blobs sidecar pruning complete";
@ -2011,7 +2132,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
} }
// Delete the states from the hot database if we got this far. // Delete the states from the hot database if we got this far.
store.do_atomically(hot_db_ops)?; store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?;
debug!( debug!(
store.log, store.log,

View File

@ -101,6 +101,7 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
} }
#[must_use] #[must_use]
#[derive(Clone)]
pub enum KeyValueStoreOp { pub enum KeyValueStoreOp {
PutKeyValue(Vec<u8>, Vec<u8>), PutKeyValue(Vec<u8>, Vec<u8>),
DeleteKey(Vec<u8>), DeleteKey(Vec<u8>),
@ -154,6 +155,7 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
/// Reified key-value storage operation. Helps in modifying the storage atomically. /// Reified key-value storage operation. Helps in modifying the storage atomically.
/// See also https://github.com/sigp/lighthouse/issues/692 /// See also https://github.com/sigp/lighthouse/issues/692
#[derive(Clone)]
pub enum StoreOp<'a, E: EthSpec> { pub enum StoreOp<'a, E: EthSpec> {
PutBlock(Hash256, Arc<SignedBeaconBlock<E>>), PutBlock(Hash256, Arc<SignedBeaconBlock<E>>),
PutState(Hash256, &'a BeaconState<E>), PutState(Hash256, &'a BeaconState<E>),

View File

@ -124,6 +124,8 @@ impl StoreItem for AnchorInfo {
pub struct BlobInfo { pub struct BlobInfo {
/// The slot after which blobs are available (>=). /// The slot after which blobs are available (>=).
pub oldest_blob_slot: Option<Slot>, pub oldest_blob_slot: Option<Slot>,
/// A separate blobs database is in use.
pub blobs_db: bool,
} }
impl StoreItem for BlobInfo { impl StoreItem for BlobInfo {

View File

@ -9,6 +9,12 @@ pub type MaxAccessListSize = U16777216;
pub type MaxVersionedHashesListSize = U16777216; pub type MaxVersionedHashesListSize = U16777216;
pub type MaxAccessListStorageKeys = U16777216; pub type MaxAccessListStorageKeys = U16777216;
#[derive(Debug, Clone, PartialEq, Encode, Decode)]
pub struct SignedBlobTransaction {
pub message: BlobTransaction,
pub signature: EcdsaSignature,
}
#[derive(Debug, Clone, PartialEq, Encode, Decode)] #[derive(Debug, Clone, PartialEq, Encode, Decode)]
pub struct BlobTransaction { pub struct BlobTransaction {
pub chain_id: Uint256, pub chain_id: Uint256,
@ -29,3 +35,10 @@ pub struct AccessTuple {
pub address: Address, pub address: Address,
pub storage_keys: VariableList<Hash256, MaxAccessListStorageKeys>, pub storage_keys: VariableList<Hash256, MaxAccessListStorageKeys>,
} }
#[derive(Debug, Clone, PartialEq, Encode, Decode)]
pub struct EcdsaSignature {
pub y_parity: bool,
pub r: Uint256,
pub s: Uint256,
}

View File

@ -104,6 +104,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true) .takes_value(true)
.default_value("0"), .default_value("0"),
) )
.arg(
Arg::with_name("blobs-dir")
.long("blobs-dir")
.value_name("DIR")
.help("Data directory for the blobs database.")
.takes_value(true),
)
.subcommand(migrate_cli_app()) .subcommand(migrate_cli_app())
.subcommand(version_cli_app()) .subcommand(version_cli_app())
.subcommand(inspect_cli_app()) .subcommand(inspect_cli_app())
@ -123,6 +130,10 @@ fn parse_client_config<E: EthSpec>(
client_config.freezer_db_path = Some(freezer_dir); client_config.freezer_db_path = Some(freezer_dir);
} }
if let Some(blobs_db_dir) = clap_utils::parse_optional(cli_args, "blobs-dir")? {
client_config.blobs_db_path = Some(blobs_db_dir);
}
let (sprp, sprp_explicit) = get_slots_per_restore_point::<E>(cli_args)?; let (sprp, sprp_explicit) = get_slots_per_restore_point::<E>(cli_args)?;
client_config.store.slots_per_restore_point = sprp; client_config.store.slots_per_restore_point = sprp;
client_config.store.slots_per_restore_point_set_explicitly = sprp_explicit; client_config.store.slots_per_restore_point_set_explicitly = sprp_explicit;
@ -144,11 +155,13 @@ pub fn display_db_version<E: EthSpec>(
let spec = runtime_context.eth2_config.spec.clone(); let spec = runtime_context.eth2_config.spec.clone();
let hot_path = client_config.get_db_path(); let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path(); let cold_path = client_config.get_freezer_db_path();
let blobs_path = client_config.get_blobs_db_path();
let mut version = CURRENT_SCHEMA_VERSION; let mut version = CURRENT_SCHEMA_VERSION;
HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open( HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
&hot_path, &hot_path,
&cold_path, &cold_path,
blobs_path,
|_, from, _| { |_, from, _| {
version = from; version = from;
Ok(()) Ok(())
@ -200,10 +213,12 @@ pub fn inspect_db<E: EthSpec>(
let spec = runtime_context.eth2_config.spec.clone(); let spec = runtime_context.eth2_config.spec.clone();
let hot_path = client_config.get_db_path(); let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path(); let cold_path = client_config.get_freezer_db_path();
let blobs_path = client_config.get_blobs_db_path();
let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open( let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
&hot_path, &hot_path,
&cold_path, &cold_path,
blobs_path,
|_, _, _| Ok(()), |_, _, _| Ok(()),
client_config.store, client_config.store,
spec, spec,
@ -254,12 +269,14 @@ pub fn migrate_db<E: EthSpec>(
let spec = &runtime_context.eth2_config.spec; let spec = &runtime_context.eth2_config.spec;
let hot_path = client_config.get_db_path(); let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path(); let cold_path = client_config.get_freezer_db_path();
let blobs_path = client_config.get_blobs_db_path();
let mut from = CURRENT_SCHEMA_VERSION; let mut from = CURRENT_SCHEMA_VERSION;
let to = migrate_config.to; let to = migrate_config.to;
let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open( let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
&hot_path, &hot_path,
&cold_path, &cold_path,
blobs_path,
|_, db_initial_version, _| { |_, db_initial_version, _| {
from = db_initial_version; from = db_initial_version;
Ok(()) Ok(())
@ -294,10 +311,12 @@ pub fn prune_payloads<E: EthSpec>(
let spec = &runtime_context.eth2_config.spec; let spec = &runtime_context.eth2_config.spec;
let hot_path = client_config.get_db_path(); let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path(); let cold_path = client_config.get_freezer_db_path();
let blobs_path = client_config.get_blobs_db_path();
let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open( let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
&hot_path, &hot_path,
&cold_path, &cold_path,
blobs_path,
|_, _, _| Ok(()), |_, _, _| Ok(()),
client_config.store, client_config.store,
spec.clone(), spec.clone(),
@ -318,10 +337,12 @@ pub fn prune_blobs<E: EthSpec>(
let spec = &runtime_context.eth2_config.spec; let spec = &runtime_context.eth2_config.spec;
let hot_path = client_config.get_db_path(); let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path(); let cold_path = client_config.get_freezer_db_path();
let blobs_path = client_config.get_blobs_db_path();
let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open( let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
&hot_path, &hot_path,
&cold_path, &cold_path,
blobs_path,
|_, _, _| Ok(()), |_, _, _| Ok(()),
client_config.store, client_config.store,
spec.clone(), spec.clone(),