From 8661477675b4ff0d704e94c499fea38a0649b3b8 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 7 Feb 2023 21:32:36 -0500 Subject: [PATCH 01/29] use hex decode instead of parse --- beacon_node/execution_layer/src/lib.rs | 77 ++++++++++++++++++-------- consensus/types/src/transaction.rs | 13 +++++ 2 files changed, 68 insertions(+), 22 deletions(-) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 3a1365db8..7f914c50b 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -14,7 +14,7 @@ pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc}; use engines::{Engine, EngineError}; pub use engines::{EngineState, ForkchoiceState}; use eth2::types::{builder_bid::SignedBuilderBid, ForkVersionedResponse}; -use ethers_core::types::Transaction as EthersTransaction; +use ethers_core::types::{Transaction as EthersTransaction, U64}; use fork_choice::ForkchoiceUpdateParameters; use lru::LruCache; use payload_status::process_payload_status; @@ -39,8 +39,10 @@ use tokio::{ }; use tokio_stream::wrappers::WatchStream; use types::consts::eip4844::BLOB_TX_TYPE; -use types::transaction::{AccessTuple, BlobTransaction}; -use types::{AbstractExecPayload, BeaconStateError, Blob, ExecPayload, KzgCommitment}; +use types::transaction::{AccessTuple, BlobTransaction, EcdsaSignature, SignedBlobTransaction}; +use types::{ + AbstractExecPayload, BeaconStateError, Blob, ExecPayload, KzgCommitment, VersionedHash, +}; use types::{ BlindedPayload, BlockType, ChainSpec, Epoch, ExecutionBlockHash, ForkName, ProposerPreparationData, PublicKeyBytes, Signature, SignedBeaconBlock, Slot, Transaction, @@ -2030,10 +2032,14 @@ pub enum BlobTxConversionError { MaxFeePerDataGasMissing, /// Missing the `blob_versioned_hashes` field. BlobVersionedHashesMissing, + /// `y_parity` field was greater than one. + InvalidYParity, /// There was an error converting the transaction to SSZ. SszError(ssz_types::Error), /// There was an error converting the transaction from JSON. SerdeJson(serde_json::Error), + /// There was an error converting the transaction from hex. + FromHexError(String), } impl From for BlobTxConversionError { @@ -2096,23 +2102,36 @@ fn ethers_tx_to_bytes( }) .collect::, BlobTxConversionError>>()?, )?; - let max_fee_per_data_gas = transaction + let max_fee_per_data_gas = Uint256::from_big_endian( + ð2_serde_utils::hex::decode( + transaction + .other + .get("maxFeePerDataGas") + .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)? + .as_str() + .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)?, + ) + .map_err(BlobTxConversionError::FromHexError)?, + ); + let blob_versioned_hashes = transaction .other - .get("maxFeePerDataGas") - .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)? - .as_str() - .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)? - .parse() - .map_err(|_| BlobTxConversionError::MaxFeePerDataGasMissing)?; - let blob_versioned_hashes = serde_json::from_str( - transaction - .other - .get("blobVersionedHashes") - .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)? - .as_str() - .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)?, - )?; - BlobTransaction { + .get("blobVersionedHashes") + .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)? + .as_array() + .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)? + .into_iter() + .map(|versioned_hash| { + Ok(Hash256::from_slice( + ð2_serde_utils::hex::decode( + versioned_hash + .as_str() + .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)?, + ) + .map_err(BlobTxConversionError::FromHexError)?, + )) + }) + .collect::, BlobTxConversionError>>()?; + let message = BlobTransaction { chain_id, nonce, max_priority_fee_per_gas, @@ -2123,9 +2142,23 @@ fn ethers_tx_to_bytes( data, access_list, max_fee_per_data_gas, - blob_versioned_hashes, - } - .as_ssz_bytes() + blob_versioned_hashes: VariableList::new(blob_versioned_hashes)?, + }; + + let y_parity = if transaction.v == U64::zero() { + false + } else if transaction.v == U64::one() { + true + } else { + return Err(BlobTxConversionError::InvalidYParity); + }; + let r = transaction.r; + let s = transaction.s; + let signature = EcdsaSignature { y_parity, r, s }; + + let mut signed_tx = SignedBlobTransaction { message, signature }.as_ssz_bytes(); + signed_tx.insert(0, BLOB_TX_TYPE); + signed_tx } else { transaction.rlp().to_vec() }; diff --git a/consensus/types/src/transaction.rs b/consensus/types/src/transaction.rs index 797ee1dae..ee0af981b 100644 --- a/consensus/types/src/transaction.rs +++ b/consensus/types/src/transaction.rs @@ -9,6 +9,12 @@ pub type MaxAccessListSize = U16777216; pub type MaxVersionedHashesListSize = U16777216; pub type MaxAccessListStorageKeys = U16777216; +#[derive(Debug, Clone, PartialEq, Encode, Decode)] +pub struct SignedBlobTransaction { + pub message: BlobTransaction, + pub signature: EcdsaSignature, +} + #[derive(Debug, Clone, PartialEq, Encode, Decode)] pub struct BlobTransaction { pub chain_id: Uint256, @@ -29,3 +35,10 @@ pub struct AccessTuple { pub address: Address, pub storage_keys: VariableList, } + +#[derive(Debug, Clone, PartialEq, Encode, Decode)] +pub struct EcdsaSignature { + pub y_parity: bool, + pub r: Uint256, + pub s: Uint256, +} From dd40adc5c07041ad5b4b9ec5a4321bff5c2e49b5 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 8 Feb 2023 10:38:45 -0500 Subject: [PATCH 02/29] check byte length when converting to uint256 and hash256 from bytes. Add comments --- beacon_node/execution_layer/src/lib.rs | 80 ++++++++++++++++++++------ 1 file changed, 61 insertions(+), 19 deletions(-) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 7f914c50b..bdf74d0a5 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -2040,6 +2040,10 @@ pub enum BlobTxConversionError { SerdeJson(serde_json::Error), /// There was an error converting the transaction from hex. FromHexError(String), + /// The `max_fee_per_data_gas` field did not contains 32 bytes. + InvalidDataGasBytesLen, + /// A `versioned_hash` did not contain 32 bytes. + InvalidVersionedHashBytesLen, } impl From for BlobTxConversionError { @@ -2064,29 +2068,49 @@ fn ethers_tx_to_bytes( .transaction_type .ok_or(BlobTxConversionError::NoTransactionType)? .as_u64(); + let tx = if BLOB_TX_TYPE as u64 == tx_type { + // ******************** BlobTransaction fields ******************** + + // chainId let chain_id = transaction .chain_id .ok_or(BlobTxConversionError::NoChainId)?; + + // nonce let nonce = if transaction.nonce > Uint256::from(u64::MAX) { return Err(BlobTxConversionError::NonceTooLarge); } else { transaction.nonce.as_u64() }; + + // maxPriorityFeePerGas let max_priority_fee_per_gas = transaction .max_priority_fee_per_gas .ok_or(BlobTxConversionError::MaxPriorityFeePerGasMissing)?; + + // maxFeePerGas let max_fee_per_gas = transaction .max_fee_per_gas .ok_or(BlobTxConversionError::MaxFeePerGasMissing)?; + + // gas let gas = if transaction.gas > Uint256::from(u64::MAX) { return Err(BlobTxConversionError::GasTooHigh); } else { transaction.gas.as_u64() }; + + // to let to = transaction.to; + + // value let value = transaction.value; + + // data (a.k.a input) let data = VariableList::new(transaction.input.to_vec())?; + + // accessList let access_list = VariableList::new( transaction .access_list @@ -2102,17 +2126,29 @@ fn ethers_tx_to_bytes( }) .collect::, BlobTxConversionError>>()?, )?; - let max_fee_per_data_gas = Uint256::from_big_endian( - ð2_serde_utils::hex::decode( - transaction - .other - .get("maxFeePerDataGas") - .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)? - .as_str() - .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)?, - ) - .map_err(BlobTxConversionError::FromHexError)?, - ); + + // ******************** BlobTransaction `other` fields ******************** + // + // Here we use the `other` field in the `ethers-rs` `Transaction` type because + // `ethers-rs` does not yet support SSZ and therefore the blobs transaction type. + + // maxFeePerDataGas + let data_gas_bytes = eth2_serde_utils::hex::decode( + transaction + .other + .get("maxFeePerDataGas") + .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)? + .as_str() + .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)?, + ) + .map_err(BlobTxConversionError::FromHexError)?; + let max_fee_per_data_gas = if data_gas_bytes.len() != Uint256::ssz_fixed_len() { + Err(BlobTxConversionError::InvalidDataGasBytesLen) + } else { + Ok(Uint256::from_big_endian(&data_gas_bytes)) + }?; + + // blobVersionedHashes let blob_versioned_hashes = transaction .other .get("blobVersionedHashes") @@ -2121,14 +2157,17 @@ fn ethers_tx_to_bytes( .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)? .into_iter() .map(|versioned_hash| { - Ok(Hash256::from_slice( - ð2_serde_utils::hex::decode( - versioned_hash - .as_str() - .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)?, - ) - .map_err(BlobTxConversionError::FromHexError)?, - )) + let hash_bytes = eth2_serde_utils::hex::decode( + versioned_hash + .as_str() + .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)?, + ) + .map_err(BlobTxConversionError::FromHexError)?; + if hash_bytes.len() != Hash256::ssz_fixed_len() { + Err(BlobTxConversionError::InvalidVersionedHashBytesLen) + } else { + Ok(Hash256::from_slice(&hash_bytes)) + } }) .collect::, BlobTxConversionError>>()?; let message = BlobTransaction { @@ -2145,6 +2184,8 @@ fn ethers_tx_to_bytes( blob_versioned_hashes: VariableList::new(blob_versioned_hashes)?, }; + // ******************** EcdsaSignature fields ******************** + let y_parity = if transaction.v == U64::zero() { false } else if transaction.v == U64::one() { @@ -2156,6 +2197,7 @@ fn ethers_tx_to_bytes( let s = transaction.s; let signature = EcdsaSignature { y_parity, r, s }; + // The `BLOB_TX_TYPE` should prepend the SSZ encoded `SignedBlobTransaction`. let mut signed_tx = SignedBlobTransaction { message, signature }.as_ssz_bytes(); signed_tx.insert(0, BLOB_TX_TYPE); signed_tx From 902f64a94692d6c87975e27761b2ad46571cb955 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 8 Feb 2023 10:59:48 -0500 Subject: [PATCH 03/29] remove clone of access lists --- beacon_node/execution_layer/src/lib.rs | 105 ++++++++++++++----------- 1 file changed, 60 insertions(+), 45 deletions(-) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index bdf74d0a5..50f01893d 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -14,6 +14,7 @@ pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc}; use engines::{Engine, EngineError}; pub use engines::{EngineState, ForkchoiceState}; use eth2::types::{builder_bid::SignedBuilderBid, ForkVersionedResponse}; +use ethers_core::types::transaction::eip2930::AccessListItem; use ethers_core::types::{Transaction as EthersTransaction, U64}; use fork_choice::ForkchoiceUpdateParameters; use lru::LruCache; @@ -51,6 +52,7 @@ use types::{ use types::{ ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge, }; +use warp::hyper::body::HttpBody; mod block_hash; mod engine_api; @@ -1689,13 +1691,15 @@ impl ExecutionLayer { return Ok(None); }; - let transactions = VariableList::from( - block - .transactions() - .into_iter() - .map(ethers_tx_to_bytes::) - .collect::, BlobTxConversionError>>()?, - ); + let convert_transactions = |transactions: Vec| { + VariableList::new( + transactions + .into_iter() + .map(ethers_tx_to_bytes::) + .collect::, BlobTxConversionError>>()?, + ) + .map_err(BlobTxConversionError::SszError) + }; let payload = match block { ExecutionBlockWithTransactions::Merge(merge_block) => { @@ -1713,7 +1717,7 @@ impl ExecutionLayer { extra_data: merge_block.extra_data, base_fee_per_gas: merge_block.base_fee_per_gas, block_hash: merge_block.block_hash, - transactions, + transactions: convert_transactions(merge_block.transactions)?, }) } ExecutionBlockWithTransactions::Capella(capella_block) => { @@ -1739,7 +1743,7 @@ impl ExecutionLayer { extra_data: capella_block.extra_data, base_fee_per_gas: capella_block.base_fee_per_gas, block_hash: capella_block.block_hash, - transactions, + transactions: convert_transactions(capella_block.transactions)?, withdrawals, }) } @@ -1767,7 +1771,7 @@ impl ExecutionLayer { base_fee_per_gas: eip4844_block.base_fee_per_gas, excess_data_gas: eip4844_block.excess_data_gas, block_hash: eip4844_block.block_hash, - transactions, + transactions: convert_transactions(eip4844_block.transactions)?, withdrawals, }) } @@ -2062,7 +2066,7 @@ impl From for BlobTxConversionError { /// on transaction type. That means RLP encoding if this is a transaction other than a /// `BLOB_TX_TYPE` transaction in which case, SSZ encoding will be used. fn ethers_tx_to_bytes( - transaction: &EthersTransaction, + transaction: EthersTransaction, ) -> Result, BlobTxConversionError> { let tx_type = transaction .transaction_type @@ -2070,58 +2074,73 @@ fn ethers_tx_to_bytes( .as_u64(); let tx = if BLOB_TX_TYPE as u64 == tx_type { + + let EthersTransaction { + hash, + nonce, + block_hash, + block_number, + transaction_index, + from, + to, + value, + gas_price, + gas, + input, + v, + r, + s, + transaction_type, + access_list, + max_priority_fee_per_gas, + max_fee_per_gas, + chain_id, + other, + } = transaction; + // ******************** BlobTransaction fields ******************** // chainId - let chain_id = transaction - .chain_id - .ok_or(BlobTxConversionError::NoChainId)?; + let chain_id = chain_id.ok_or(BlobTxConversionError::NoChainId)?; // nonce - let nonce = if transaction.nonce > Uint256::from(u64::MAX) { + let nonce = if nonce > Uint256::from(u64::MAX) { return Err(BlobTxConversionError::NonceTooLarge); } else { - transaction.nonce.as_u64() + nonce.as_u64() }; // maxPriorityFeePerGas - let max_priority_fee_per_gas = transaction - .max_priority_fee_per_gas - .ok_or(BlobTxConversionError::MaxPriorityFeePerGasMissing)?; + let max_priority_fee_per_gas = + max_priority_fee_per_gas.ok_or(BlobTxConversionError::MaxPriorityFeePerGasMissing)?; // maxFeePerGas - let max_fee_per_gas = transaction - .max_fee_per_gas - .ok_or(BlobTxConversionError::MaxFeePerGasMissing)?; + let max_fee_per_gas = max_fee_per_gas.ok_or(BlobTxConversionError::MaxFeePerGasMissing)?; // gas - let gas = if transaction.gas > Uint256::from(u64::MAX) { + let gas = if gas > Uint256::from(u64::MAX) { return Err(BlobTxConversionError::GasTooHigh); } else { - transaction.gas.as_u64() + gas.as_u64() }; - // to - let to = transaction.to; - - // value - let value = transaction.value; - // data (a.k.a input) - let data = VariableList::new(transaction.input.to_vec())?; + let data = VariableList::new(input.to_vec())?; // accessList let access_list = VariableList::new( - transaction - .access_list - .as_ref() + access_list .ok_or(BlobTxConversionError::AccessListMissing)? .0 - .iter() + .into_iter() .map(|access_tuple| { + let AccessListItem { + address, + storage_keys, + } = access_tuple; Ok(AccessTuple { - address: access_tuple.address, - storage_keys: VariableList::new(access_tuple.storage_keys.clone())?, + address, + storage_keys: VariableList::new(storage_keys)?, }) }) .collect::, BlobTxConversionError>>()?, @@ -2134,8 +2153,7 @@ fn ethers_tx_to_bytes( // maxFeePerDataGas let data_gas_bytes = eth2_serde_utils::hex::decode( - transaction - .other + other .get("maxFeePerDataGas") .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)? .as_str() @@ -2149,8 +2167,7 @@ fn ethers_tx_to_bytes( }?; // blobVersionedHashes - let blob_versioned_hashes = transaction - .other + let blob_versioned_hashes = other .get("blobVersionedHashes") .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)? .as_array() @@ -2186,15 +2203,13 @@ fn ethers_tx_to_bytes( // ******************** EcdsaSignature fields ******************** - let y_parity = if transaction.v == U64::zero() { + let y_parity = if v == U64::zero() { false - } else if transaction.v == U64::one() { + } else if v == U64::one() { true } else { return Err(BlobTxConversionError::InvalidYParity); }; - let r = transaction.r; - let s = transaction.s; let signature = EcdsaSignature { y_parity, r, s }; // The `BLOB_TX_TYPE` should prepend the SSZ encoded `SignedBlobTransaction`. From 99da11e9f4ca804f2cf993f87355972568cbaaed Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 8 Feb 2023 11:03:34 -0500 Subject: [PATCH 04/29] fix lints --- beacon_node/execution_layer/src/lib.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 50f01893d..d60082b91 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -52,7 +52,6 @@ use types::{ use types::{ ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge, }; -use warp::hyper::body::HttpBody; mod block_hash; mod engine_api; @@ -2074,23 +2073,22 @@ fn ethers_tx_to_bytes( .as_u64(); let tx = if BLOB_TX_TYPE as u64 == tx_type { - let EthersTransaction { - hash, + hash: _, nonce, - block_hash, - block_number, - transaction_index, - from, + block_hash: _, + block_number: _, + transaction_index: _, + from: _, to, value, - gas_price, + gas_price: _, gas, input, v, r, s, - transaction_type, + transaction_type: _, access_list, max_priority_fee_per_gas, max_fee_per_gas, From f9737628fc092aaa7a0cad12c5feb5e89b06bdb4 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 11 Jan 2023 00:17:26 +0100 Subject: [PATCH 05/29] Store blobs in separate freezer or historical state freezer --- beacon_node/client/src/builder.rs | 5 +++ beacon_node/client/src/config.rs | 21 +++++++++++ beacon_node/src/cli.rs | 7 ++++ beacon_node/src/config.rs | 4 +++ beacon_node/src/lib.rs | 14 +++++++- beacon_node/store/src/hot_cold_store.rs | 47 +++++++++++++++++++++++-- database_manager/src/lib.rs | 17 +++++++++ 7 files changed, 112 insertions(+), 3 deletions(-) diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 23c4977b7..e09d2621f 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -68,6 +68,7 @@ pub struct ClientBuilder { gossipsub_registry: Option, db_path: Option, freezer_db_path: Option, + blobs_freezer_db_path: Option, http_api_config: http_api::Config, http_metrics_config: http_metrics::Config, slasher: Option>>, @@ -100,6 +101,7 @@ where gossipsub_registry: None, db_path: None, freezer_db_path: None, + blobs_freezer_db_path: None, http_api_config: <_>::default(), http_metrics_config: <_>::default(), slasher: None, @@ -892,6 +894,7 @@ where mut self, hot_path: &Path, cold_path: &Path, + cold_blobs_path: Option, config: StoreConfig, log: Logger, ) -> Result { @@ -907,6 +910,7 @@ where self.db_path = Some(hot_path.into()); self.freezer_db_path = Some(cold_path.into()); + self.blobs_freezer_db_path = cold_blobs_path; let inner_spec = spec.clone(); let deposit_contract_deploy_block = context @@ -929,6 +933,7 @@ where let store = HotColdDB::open( hot_path, cold_path, + cold_blobs_path, schema_upgrade, config, spec, diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 6c3a98a46..598945923 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -49,6 +49,9 @@ pub struct Config { pub db_name: String, /// Path where the freezer database will be located. pub freezer_db_path: Option, + /// Path where the blobs freezer database will be located if it should be separate from the + /// historical state freezer. + pub blobs_freezer_db_path: Option, pub log_file: PathBuf, /// If true, the node will use co-ordinated junk for eth1 values. /// @@ -89,6 +92,7 @@ impl Default for Config { data_dir: PathBuf::from(DEFAULT_ROOT_DIR), db_name: "chain_db".to_string(), freezer_db_path: None, + blobs_freezer_db_path: None, log_file: PathBuf::from(""), genesis: <_>::default(), store: <_>::default(), @@ -149,11 +153,28 @@ impl Config { .unwrap_or_else(|| self.default_freezer_db_path()) } + /// Returns the path to which the client may initialize the on-disk blobs freezer database. + /// + /// Will attempt to use the user-supplied path from e.g. the CLI, or will default + /// to None. + pub fn get_blobs_freezer_db_path(&self) -> Option { + self.blobs_freezer_db_path.clone() + } + /// Get the freezer DB path, creating it if necessary. pub fn create_freezer_db_path(&self) -> Result { ensure_dir_exists(self.get_freezer_db_path()) } + /// Get the blobs freezer DB path, creating it if necessary. + pub fn create_blobs_freezer_db_path(&self) -> Result, String> { + if let Some(blobs_freezer_path) = self.get_blobs_freezer_db_path() { + Ok(Some(ensure_dir_exists(blobs_freezer_path)?)) + } else { + Ok(None) + } + } + /// Returns the "modern" path to the data_dir. /// /// See `Self::get_data_dir` documentation for more info. diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index e711dfca9..a3e4dbcc8 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -28,6 +28,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .help("Data directory for the freezer database.") .takes_value(true) ) + .arg( + Arg::with_name("blobs-freezer-dir") + .long("blobs-freezer-dir") + .value_name("DIR") + .help("Data directory for the blobs freezer database.") + .takes_value(true) + ) /* * Network parameters. */ diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 7ced91274..4fa916dc3 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -390,6 +390,10 @@ pub fn get_config( client_config.freezer_db_path = Some(PathBuf::from(freezer_dir)); } + if let Some(blobs_freezer_dir) = cli_args.value_of("blobs-freezer-dir") { + client_config.blobs_freezer_db_path = Some(PathBuf::from(blobs_freezer_dir)); + } + let (sprp, sprp_explicit) = get_slots_per_restore_point::(cli_args)?; client_config.store.slots_per_restore_point = sprp; client_config.store.slots_per_restore_point_set_explicitly = sprp_explicit; diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 650763dca..38e4854a5 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -64,6 +64,12 @@ impl ProductionBeaconNode { let _datadir = client_config.create_data_dir()?; let db_path = client_config.create_db_path()?; let freezer_db_path = client_config.create_freezer_db_path()?; + let blobs_freezer_db_path = + if let Some(path) = client_config.create_blobs_freezer_db_path()? { + Some(*path.as_path().clone()) + } else { + None + }; let executor = context.executor.clone(); if let Some(legacy_dir) = client_config.get_existing_legacy_data_dir() { @@ -84,7 +90,13 @@ impl ProductionBeaconNode { .runtime_context(context) .chain_spec(spec) .http_api_config(client_config.http_api.clone()) - .disk_store(&db_path, &freezer_db_path, store_config, log.clone())?; + .disk_store( + &db_path, + &freezer_db_path, + blobs_freezer_db_path, + store_config, + log.clone(), + )?; let builder = if let Some(slasher_config) = client_config.slasher.clone() { let slasher = Arc::new( diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 99b516ee9..951dbd2ca 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -35,7 +35,7 @@ use state_processing::{ use std::cmp::min; use std::convert::TryInto; use std::marker::PhantomData; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; @@ -59,6 +59,8 @@ pub struct HotColdDB, Cold: ItemStore> { pub(crate) config: StoreConfig, /// Cold database containing compact historical data. pub cold_db: Cold, + /// Cold database containing blob data with slots less than `split.slot`. + pub cold_blobs_db: Option, /// Hot database containing duplicated but quick-to-access recent data. /// /// The hot database also contains all blocks. @@ -92,6 +94,7 @@ pub enum HotColdDBError { MissingRestorePointHash(u64), MissingRestorePoint(Hash256), MissingColdStateSummary(Hash256), + MissingColdBlobs(Hash256), MissingHotStateSummary(Hash256), MissingEpochBoundaryState(Hash256), MissingSplitState(Hash256, Slot), @@ -134,6 +137,7 @@ impl HotColdDB, MemoryStore> { anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), cold_db: MemoryStore::open(), + cold_blobs_db: Some(MemoryStore::open()), hot_db: MemoryStore::open(), block_cache: Mutex::new(LruCache::new(config.block_cache_size)), blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)), @@ -157,6 +161,7 @@ impl HotColdDB, LevelDB> { pub fn open( hot_path: &Path, cold_path: &Path, + cold_blobs_path: Option, migrate_schema: impl FnOnce(Arc, SchemaVersion, SchemaVersion) -> Result<(), Error>, config: StoreConfig, spec: ChainSpec, @@ -164,11 +169,18 @@ impl HotColdDB, LevelDB> { ) -> Result, Error> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; + let cold_blobs_db = if let Some(path) = cold_blobs_path { + Some(LevelDB::open(path.as_path())?) + } else { + None + }; + let mut db = HotColdDB { split: RwLock::new(Split::default()), anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), cold_db: LevelDB::open(cold_path)?, + cold_blobs_db, hot_db: LevelDB::open(hot_path)?, block_cache: Mutex::new(LruCache::new(config.block_cache_size)), blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)), @@ -532,7 +544,19 @@ impl, Cold: ItemStore> HotColdDB self.blob_cache.lock().put(*block_root, ret.clone()); Ok(Some(ret)) } else { - Ok(None) + let blobs_freezer = if let Some(ref cold_blobs_db) = self.cold_blobs_db { + cold_blobs_db + } else { + &self.cold_db + }; + + if let Some(ref blobs_bytes) = + blobs_freezer.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? + { + Ok(Some(BlobsSidecar::from_ssz_bytes(blobs_bytes)?)) + } else { + Ok(None) + } } } @@ -1918,6 +1942,13 @@ pub fn migrate_database, Cold: ItemStore>( } let mut hot_db_ops: Vec> = Vec::new(); + let mut cold_blobs_db_ops: Vec> = Vec::new(); + + let blobs_freezer = if let Some(ref cold_blobs_db) = store.cold_blobs_db { + cold_blobs_db + } else { + &store.cold_db + }; // 1. Copy all of the states between the head and the split slot, from the hot DB // to the cold DB. Delete the execution payloads of these now-finalized blocks. @@ -1961,8 +1992,17 @@ pub fn migrate_database, Cold: ItemStore>( if store.config.prune_payloads { hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root)); } + + // Prepare migration of blobs to freezer. + if let Some(blobs) = store.get_blobs(&block_root)? { + hot_db_ops.push(StoreOp::DeleteBlobs(block_root)); + cold_blobs_db_ops.push(StoreOp::PutBlobs(block_root, Arc::new(blobs))); + } } + // Migrate blobs to freezer. + blobs_freezer.do_atomically(store.convert_to_kv_batch(cold_blobs_db_ops)?)?; + // Warning: Critical section. We have to take care not to put any of the two databases in an // inconsistent state if the OS process dies at any point during the freezing // procedure. @@ -1975,6 +2015,9 @@ pub fn migrate_database, Cold: ItemStore>( // Flush to disk all the states that have just been migrated to the cold store. store.cold_db.sync()?; + if let Some(ref cold_blobs_db) = store.cold_blobs_db { + cold_blobs_db.sync()?; + } { let mut split_guard = store.split.write(); diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index a33e6c149..93377c60e 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -103,6 +103,11 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { ) .takes_value(true) .default_value("0"), + Arg::with_name("blobs-freezer-dir") + .long("blobs-freezer-dir") + .value_name("DIR") + .help("Data directory for the blobs freezer database.") + .takes_value(true), ) .subcommand(migrate_cli_app()) .subcommand(version_cli_app()) @@ -123,6 +128,10 @@ fn parse_client_config( client_config.freezer_db_path = Some(freezer_dir); } + if let Some(blobs_freezer_dir) = clap_utils::parse_optional(cli_args, "blobs-freezer-dir")? { + client_config.blobs_freezer_db_path = Some(blobs_freezer_dir); + } + let (sprp, sprp_explicit) = get_slots_per_restore_point::(cli_args)?; client_config.store.slots_per_restore_point = sprp; client_config.store.slots_per_restore_point_set_explicitly = sprp_explicit; @@ -144,11 +153,13 @@ pub fn display_db_version( let spec = runtime_context.eth2_config.spec.clone(); let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); + let cold_blobs_path = client_config.get_blobs_freezer_db_path(); let mut version = CURRENT_SCHEMA_VERSION; HotColdDB::, LevelDB>::open( &hot_path, &cold_path, + &cold_blobs_path, |_, from, _| { version = from; Ok(()) @@ -200,10 +211,12 @@ pub fn inspect_db( let spec = runtime_context.eth2_config.spec.clone(); let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); + let cold_blobs_path = client_config.get_blobs_freezer_db_path(); let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, + &cold_blobs_path, |_, _, _| Ok(()), client_config.store, spec, @@ -254,12 +267,14 @@ pub fn migrate_db( let spec = &runtime_context.eth2_config.spec; let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); + let cold_blobs_path = client_config.get_blobs_freezer_db_path(); let mut from = CURRENT_SCHEMA_VERSION; let to = migrate_config.to; let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, + &cold_blobs_path, |_, db_initial_version, _| { from = db_initial_version; Ok(()) @@ -294,10 +309,12 @@ pub fn prune_payloads( let spec = &runtime_context.eth2_config.spec; let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); + let cold_blobs_path = client_config.get_blobs_freezer_db_path(); let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, + &cold_blobs_path, |_, _, _| Ok(()), client_config.store, spec.clone(), From e0b1a0841cffee0c92ea13ff845c9cc9463b6243 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 11 Jan 2023 00:27:30 +0100 Subject: [PATCH 06/29] fixup! Store blobs in separate freezer or historical state freezer --- beacon_node/src/lib.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 38e4854a5..532f1bdfe 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -64,12 +64,7 @@ impl ProductionBeaconNode { let _datadir = client_config.create_data_dir()?; let db_path = client_config.create_db_path()?; let freezer_db_path = client_config.create_freezer_db_path()?; - let blobs_freezer_db_path = - if let Some(path) = client_config.create_blobs_freezer_db_path()? { - Some(*path.as_path().clone()) - } else { - None - }; + let blobs_freezer_db_path = client_config.create_blobs_freezer_db_path()?; let executor = context.executor.clone(); if let Some(legacy_dir) = client_config.get_existing_legacy_data_dir() { From 05c51b37b19d1390679617134aa8b91d8d8e883c Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 1 Feb 2023 17:47:57 +0100 Subject: [PATCH 07/29] fix rebase conflicts --- .vscode/settings.json | 5 ++ beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- beacon_node/client/src/builder.rs | 2 +- beacon_node/http_api/src/block_id.rs | 2 +- .../beacon_processor/worker/rpc_methods.rs | 2 + beacon_node/store/src/hot_cold_store.rs | 79 +++++++++++++------ 6 files changed, 65 insertions(+), 27 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000..3b125ccb1 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "cSpell.words": [ + "blbo" + ] +} \ No newline at end of file diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 741d9a95b..721460397 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1072,7 +1072,7 @@ impl BeaconChain { block_root: &Hash256, data_availability_boundary: Epoch, ) -> Result>, Error> { - match self.store.get_blobs(block_root)? { + match self.store.get_blobs(block_root, slot)? { Some(blobs) => Ok(Some(blobs)), None => { // Check for the corresponding block to understand whether we *should* have blobs. diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index e09d2621f..b91e29355 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -910,7 +910,7 @@ where self.db_path = Some(hot_path.into()); self.freezer_db_path = Some(cold_path.into()); - self.blobs_freezer_db_path = cold_blobs_path; + self.blobs_freezer_db_path = cold_blobs_path.clone(); let inner_spec = spec.clone(); let deposit_contract_deploy_block = context diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 45c7bed1f..96e3d6fc8 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -218,7 +218,7 @@ impl BlockId { chain: &BeaconChain, ) -> Result>, warp::Rejection> { let root = self.root(chain)?.0; - match chain.store.get_blobs(&root) { + match chain.get_blobs(&root, None) { Ok(Some(blob)) => Ok(Arc::new(blob)), Ok(None) => Err(warp_utils::reject::custom_not_found(format!( "Blob with block root {} is not in the store", diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 01b7cb43b..0c903ec33 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -795,12 +795,14 @@ impl Worker { // remove all skip slots let block_roots = block_roots.into_iter().flatten().collect::>(); + let mut slot_hint: Option = None; let mut blobs_sent = 0; let mut send_response = true; for root in block_roots { match self.chain.get_blobs(&root, data_availability_boundary) { Ok(Some(blobs)) => { + slot_hint = Some(blobs.beacon_block_slot + 1); blobs_sent += 1; self.send_network_message(NetworkMessage::SendResponse { peer_id, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 951dbd2ca..7d3abebbc 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -533,31 +533,27 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { - // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, - // may want to attempt to use one again - if let Some(bytes) = self - .hot_db - .get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? - { - let ret = BlobsSidecar::from_ssz_bytes(&bytes)?; - self.blob_cache.lock().put(*block_root, ret.clone()); - Ok(Some(ret)) - } else { - let blobs_freezer = if let Some(ref cold_blobs_db) = self.cold_blobs_db { - cold_blobs_db - } else { - &self.cold_db - }; - - if let Some(ref blobs_bytes) = - blobs_freezer.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? - { - Ok(Some(BlobsSidecar::from_ssz_bytes(blobs_bytes)?)) - } else { - Ok(None) + /// Fetch a blobs sidecar from the store. + /// + /// If `slot` is provided then it will be used as a hint as to which database should + /// be checked first. + pub fn get_blobs( + &self, + block_root: &Hash256, + slot: Option, + ) -> Result>, Error> { + if let Some(slot) = slot { + if slot < self.get_split_slot() { + return match self.load_cold_blobs(block_root)? { + Some(blobs) => Ok(Some(blobs)), + None => self.load_hot_blobs(block_root), + }; } } + match self.load_hot_blobs(block_root)? { + Some(blobs) => Ok(Some(blobs)), + None => self.load_cold_blobs(block_root), + } } pub fn blobs_as_kv_store_ops( @@ -1236,6 +1232,41 @@ impl, Cold: ItemStore> HotColdDB }) } + /// Load a blobs sidecar from the hot database. + pub fn load_hot_blobs(&self, block_root: &Hash256) -> Result>, Error> { + // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, + // may want to attempt to use one again + if let Some(bytes) = self + .hot_db + .get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? + { + let ret = BlobsSidecar::from_ssz_bytes(&bytes)?; + self.blob_cache.lock().put(*block_root, ret.clone()); + Ok(Some(ret)) + } else { + Ok(None) + } + } + + /// Try to load a blobs from the freezer database. + /// + /// Return `None` if no blobs sidecar with `block_root` lies in the freezer. + pub fn load_cold_blobs(&self, block_root: &Hash256) -> Result>, Error> { + let blobs_freezer = if let Some(ref cold_blobs_db) = self.cold_blobs_db { + cold_blobs_db + } else { + &self.cold_db + }; + + if let Some(ref blobs_bytes) = + blobs_freezer.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? + { + Ok(Some(BlobsSidecar::from_ssz_bytes(blobs_bytes)?)) + } else { + Ok(None) + } + } + /// Get a reference to the `ChainSpec` used by the database. pub fn get_chain_spec(&self) -> &ChainSpec { &self.spec @@ -1994,7 +2025,7 @@ pub fn migrate_database, Cold: ItemStore>( } // Prepare migration of blobs to freezer. - if let Some(blobs) = store.get_blobs(&block_root)? { + if let Some(blobs) = store.get_blobs(&block_root, Some(slot))? { hot_db_ops.push(StoreOp::DeleteBlobs(block_root)); cold_blobs_db_ops.push(StoreOp::PutBlobs(block_root, Arc::new(blobs))); } From 0ba0775812b43f31ffd459376f14e82484a1f06c Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 11 Jan 2023 13:26:30 +0100 Subject: [PATCH 08/29] Help user configure blobs freezer correctly between start ups --- beacon_node/store/src/hot_cold_store.rs | 30 ++++++++++++++++++------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 7d3abebbc..b446b2c7d 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -94,13 +94,14 @@ pub enum HotColdDBError { MissingRestorePointHash(u64), MissingRestorePoint(Hash256), MissingColdStateSummary(Hash256), - MissingColdBlobs(Hash256), MissingHotStateSummary(Hash256), + MissingColdBlobs(Hash256), MissingEpochBoundaryState(Hash256), MissingSplitState(Hash256, Slot), MissingExecutionPayload(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingAnchorInfo, + MissingPathToBlobsFreezer, HotStateSummaryError(BeaconStateError), RestorePointDecodeError(ssz::DecodeError), BlockReplayBeaconError(BeaconStateError), @@ -169,18 +170,12 @@ impl HotColdDB, LevelDB> { ) -> Result, Error> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; - let cold_blobs_db = if let Some(path) = cold_blobs_path { - Some(LevelDB::open(path.as_path())?) - } else { - None - }; - let mut db = HotColdDB { split: RwLock::new(Split::default()), anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), cold_db: LevelDB::open(cold_path)?, - cold_blobs_db, + cold_blobs_db: None, hot_db: LevelDB::open(hot_path)?, block_cache: Mutex::new(LruCache::new(config.block_cache_size)), blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)), @@ -225,6 +220,25 @@ impl HotColdDB, LevelDB> { ); } + if db.spec.eip4844_fork_epoch.is_some() { + let blob_info = match db.load_blob_info()? { + Some(mut blob_info) => { + if blob_info.blobs_freezer { + cold_blobs_path + .as_ref() + .ok_or(HotColdDBError::MissingPathToBlobsFreezer)?; + } + if let Some(path) = cold_blobs_path { + db.cold_blobs_db = Some(LevelDB::open(path.as_path())?); + blob_info.blobs_freezer = true; + } + Some(blob_info) + } + None => None, + }; + *db.blob_info.write() = blob_info; + } + // Ensure that the schema version of the on-disk database matches the software. // If the version is mismatched, an automatic migration will be attempted. let db = Arc::new(db); From 3c0aa201e35a29121ec02c8b50e8da8887ee1bd1 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 11 Jan 2023 13:28:59 +0100 Subject: [PATCH 09/29] fixup! Help user configure blobs freezer correctly between start ups --- beacon_node/store/src/hot_cold_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index b446b2c7d..ef7175c9f 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -234,7 +234,7 @@ impl HotColdDB, LevelDB> { } Some(blob_info) } - None => None, + None => Some(BlobInfo::default()), }; *db.blob_info.write() = blob_info; } From 3679a0f1cbd520abba3832763e44e8a0e43b0328 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 11 Jan 2023 13:38:43 +0100 Subject: [PATCH 10/29] Improve syntax --- beacon_node/client/src/config.rs | 7 +++--- beacon_node/store/src/hot_cold_store.rs | 29 +++++++++++-------------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 598945923..10bca9471 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -168,10 +168,9 @@ impl Config { /// Get the blobs freezer DB path, creating it if necessary. pub fn create_blobs_freezer_db_path(&self) -> Result, String> { - if let Some(blobs_freezer_path) = self.get_blobs_freezer_db_path() { - Ok(Some(ensure_dir_exists(blobs_freezer_path)?)) - } else { - Ok(None) + match self.get_blobs_freezer_db_path() { + Some(blobs_freezer_path) => Ok(Some(ensure_dir_exists(blobs_freezer_path)?)), + None => Ok(None), } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index ef7175c9f..01971e2b3 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1250,15 +1250,16 @@ impl, Cold: ItemStore> HotColdDB pub fn load_hot_blobs(&self, block_root: &Hash256) -> Result>, Error> { // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, // may want to attempt to use one again - if let Some(bytes) = self + match self .hot_db .get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { - let ret = BlobsSidecar::from_ssz_bytes(&bytes)?; - self.blob_cache.lock().put(*block_root, ret.clone()); - Ok(Some(ret)) - } else { - Ok(None) + Some(bytes) => { + let ret = BlobsSidecar::from_ssz_bytes(&bytes)?; + self.blob_cache.lock().put(*block_root, ret.clone()); + Ok(Some(ret)) + } + None => Ok(None), } } @@ -1272,12 +1273,9 @@ impl, Cold: ItemStore> HotColdDB &self.cold_db }; - if let Some(ref blobs_bytes) = - blobs_freezer.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? - { - Ok(Some(BlobsSidecar::from_ssz_bytes(blobs_bytes)?)) - } else { - Ok(None) + match blobs_freezer.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { + Some(ref blobs_bytes) => Ok(Some(BlobsSidecar::from_ssz_bytes(blobs_bytes)?)), + None => Ok(None), } } @@ -1989,10 +1987,9 @@ pub fn migrate_database, Cold: ItemStore>( let mut hot_db_ops: Vec> = Vec::new(); let mut cold_blobs_db_ops: Vec> = Vec::new(); - let blobs_freezer = if let Some(ref cold_blobs_db) = store.cold_blobs_db { - cold_blobs_db - } else { - &store.cold_db + let blobs_freezer = match store.cold_blobs_db { + Some(ref cold_blobs_db) => cold_blobs_db, + None => &store.cold_db, }; // 1. Copy all of the states between the head and the split slot, from the hot DB From 04f635c0ac053c490af92c1def74909d674326ae Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 17 Jan 2023 15:38:50 +0100 Subject: [PATCH 11/29] Remove IDE file --- .vscode/settings.json | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 3b125ccb1..000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "cSpell.words": [ - "blbo" - ] -} \ No newline at end of file From 625980e484c0075e9106eec09da670240f939d52 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 1 Feb 2023 17:49:48 +0100 Subject: [PATCH 12/29] Fix rebase conflicts --- beacon_node/beacon_chain/src/beacon_chain.rs | 10 +- beacon_node/client/src/builder.rs | 10 +- beacon_node/client/src/config.rs | 21 ++- beacon_node/http_api/src/block_id.rs | 2 +- .../beacon_processor/worker/rpc_methods.rs | 2 - beacon_node/src/cli.rs | 6 +- beacon_node/src/config.rs | 4 +- beacon_node/src/lib.rs | 4 +- beacon_node/store/src/hot_cold_store.rs | 131 ++++++------------ beacon_node/store/src/metadata.rs | 2 + database_manager/src/lib.rs | 27 ++-- 11 files changed, 92 insertions(+), 127 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 721460397..0023386a2 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1072,7 +1072,7 @@ impl BeaconChain { block_root: &Hash256, data_availability_boundary: Epoch, ) -> Result>, Error> { - match self.store.get_blobs(block_root, slot)? { + match self.store.get_blobs(block_root)? { Some(blobs) => Ok(Some(blobs)), None => { // Check for the corresponding block to understand whether we *should* have blobs. @@ -3021,6 +3021,7 @@ impl BeaconChain { ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); +<<<<<<< HEAD // Only consider blobs if the eip4844 fork is enabled. if let Some(data_availability_boundary) = self.data_availability_boundary() { let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); @@ -3040,6 +3041,13 @@ impl BeaconChain { ops.push(StoreOp::PutBlobs(block_root, blobs)); } } +======= + if let Some(blobs) = blobs { + if blobs.blobs.len() > 0 { + //FIXME(sean) using this for debugging for now + info!(self.log, "Writing blobs to store"; "block_root" => ?block_root); + self.store.put_blobs(&block_root, (&*blobs).clone())?; +>>>>>>> 43dc3a9a4 (Fix rebase conflicts) } } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index b91e29355..e80b6fd18 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -68,7 +68,7 @@ pub struct ClientBuilder { gossipsub_registry: Option, db_path: Option, freezer_db_path: Option, - blobs_freezer_db_path: Option, + blobs_db_path: Option, http_api_config: http_api::Config, http_metrics_config: http_metrics::Config, slasher: Option>>, @@ -101,7 +101,7 @@ where gossipsub_registry: None, db_path: None, freezer_db_path: None, - blobs_freezer_db_path: None, + blobs_db_path: None, http_api_config: <_>::default(), http_metrics_config: <_>::default(), slasher: None, @@ -894,7 +894,7 @@ where mut self, hot_path: &Path, cold_path: &Path, - cold_blobs_path: Option, + blobs_path: Option, config: StoreConfig, log: Logger, ) -> Result { @@ -910,7 +910,7 @@ where self.db_path = Some(hot_path.into()); self.freezer_db_path = Some(cold_path.into()); - self.blobs_freezer_db_path = cold_blobs_path.clone(); + self.blobs_db_path = blobs_path.clone(); let inner_spec = spec.clone(); let deposit_contract_deploy_block = context @@ -933,7 +933,7 @@ where let store = HotColdDB::open( hot_path, cold_path, - cold_blobs_path, + blobs_path, schema_upgrade, config, spec, diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 10bca9471..4ea59a9da 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -49,9 +49,8 @@ pub struct Config { pub db_name: String, /// Path where the freezer database will be located. pub freezer_db_path: Option, - /// Path where the blobs freezer database will be located if it should be separate from the - /// historical state freezer. - pub blobs_freezer_db_path: Option, + /// Path where the blobs database will be located if blobs should be in a separate database. + pub blobs_db_path: Option, pub log_file: PathBuf, /// If true, the node will use co-ordinated junk for eth1 values. /// @@ -92,7 +91,7 @@ impl Default for Config { data_dir: PathBuf::from(DEFAULT_ROOT_DIR), db_name: "chain_db".to_string(), freezer_db_path: None, - blobs_freezer_db_path: None, + blobs_db_path: None, log_file: PathBuf::from(""), genesis: <_>::default(), store: <_>::default(), @@ -153,12 +152,12 @@ impl Config { .unwrap_or_else(|| self.default_freezer_db_path()) } - /// Returns the path to which the client may initialize the on-disk blobs freezer database. + /// Returns the path to which the client may initialize the on-disk blobs database. /// /// Will attempt to use the user-supplied path from e.g. the CLI, or will default /// to None. - pub fn get_blobs_freezer_db_path(&self) -> Option { - self.blobs_freezer_db_path.clone() + pub fn get_blobs_db_path(&self) -> Option { + self.blobs_db_path.clone() } /// Get the freezer DB path, creating it if necessary. @@ -166,10 +165,10 @@ impl Config { ensure_dir_exists(self.get_freezer_db_path()) } - /// Get the blobs freezer DB path, creating it if necessary. - pub fn create_blobs_freezer_db_path(&self) -> Result, String> { - match self.get_blobs_freezer_db_path() { - Some(blobs_freezer_path) => Ok(Some(ensure_dir_exists(blobs_freezer_path)?)), + /// Get the blobs DB path, creating it if necessary. + pub fn create_blobs_db_path(&self) -> Result, String> { + match self.get_blobs_db_path() { + Some(blobs_db_path) => Ok(Some(ensure_dir_exists(blobs_db_path)?)), None => Ok(None), } } diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 96e3d6fc8..9e152dc61 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -218,7 +218,7 @@ impl BlockId { chain: &BeaconChain, ) -> Result>, warp::Rejection> { let root = self.root(chain)?.0; - match chain.get_blobs(&root, None) { + match chain.get_blobs(&root) { Ok(Some(blob)) => Ok(Arc::new(blob)), Ok(None) => Err(warp_utils::reject::custom_not_found(format!( "Blob with block root {} is not in the store", diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 0c903ec33..01b7cb43b 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -795,14 +795,12 @@ impl Worker { // remove all skip slots let block_roots = block_roots.into_iter().flatten().collect::>(); - let mut slot_hint: Option = None; let mut blobs_sent = 0; let mut send_response = true; for root in block_roots { match self.chain.get_blobs(&root, data_availability_boundary) { Ok(Some(blobs)) => { - slot_hint = Some(blobs.beacon_block_slot + 1); blobs_sent += 1; self.send_network_message(NetworkMessage::SendResponse { peer_id, diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index a3e4dbcc8..eb6754aa9 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -29,10 +29,10 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) ) .arg( - Arg::with_name("blobs-freezer-dir") - .long("blobs-freezer-dir") + Arg::with_name("blobs-dir") + .long("blobs-dir") .value_name("DIR") - .help("Data directory for the blobs freezer database.") + .help("Data directory for the blobs database.") .takes_value(true) ) /* diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 4fa916dc3..e8128cb79 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -390,8 +390,8 @@ pub fn get_config( client_config.freezer_db_path = Some(PathBuf::from(freezer_dir)); } - if let Some(blobs_freezer_dir) = cli_args.value_of("blobs-freezer-dir") { - client_config.blobs_freezer_db_path = Some(PathBuf::from(blobs_freezer_dir)); + if let Some(blobs_db_dir) = cli_args.value_of("blobs-dir") { + client_config.blobs_db_path = Some(PathBuf::from(blobs_db_dir)); } let (sprp, sprp_explicit) = get_slots_per_restore_point::(cli_args)?; diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 532f1bdfe..b098f57c7 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -64,7 +64,7 @@ impl ProductionBeaconNode { let _datadir = client_config.create_data_dir()?; let db_path = client_config.create_db_path()?; let freezer_db_path = client_config.create_freezer_db_path()?; - let blobs_freezer_db_path = client_config.create_blobs_freezer_db_path()?; + let blobs_db_path = client_config.create_blobs_db_path()?; let executor = context.executor.clone(); if let Some(legacy_dir) = client_config.get_existing_legacy_data_dir() { @@ -88,7 +88,7 @@ impl ProductionBeaconNode { .disk_store( &db_path, &freezer_db_path, - blobs_freezer_db_path, + blobs_db_path, store_config, log.clone(), )?; diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 01971e2b3..845ee89b5 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -59,8 +59,8 @@ pub struct HotColdDB, Cold: ItemStore> { pub(crate) config: StoreConfig, /// Cold database containing compact historical data. pub cold_db: Cold, - /// Cold database containing blob data with slots less than `split.slot`. - pub cold_blobs_db: Option, + /// Database containing blobs. + pub blobs_db: Option, /// Hot database containing duplicated but quick-to-access recent data. /// /// The hot database also contains all blocks. @@ -101,7 +101,7 @@ pub enum HotColdDBError { MissingExecutionPayload(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingAnchorInfo, - MissingPathToBlobsFreezer, + MissingPathToBlobsDatabase, HotStateSummaryError(BeaconStateError), RestorePointDecodeError(ssz::DecodeError), BlockReplayBeaconError(BeaconStateError), @@ -138,7 +138,7 @@ impl HotColdDB, MemoryStore> { anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), cold_db: MemoryStore::open(), - cold_blobs_db: Some(MemoryStore::open()), + blobs_db: Some(MemoryStore::open()), hot_db: MemoryStore::open(), block_cache: Mutex::new(LruCache::new(config.block_cache_size)), blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)), @@ -162,7 +162,7 @@ impl HotColdDB, LevelDB> { pub fn open( hot_path: &Path, cold_path: &Path, - cold_blobs_path: Option, + blobs_db_path: Option, migrate_schema: impl FnOnce(Arc, SchemaVersion, SchemaVersion) -> Result<(), Error>, config: StoreConfig, spec: ChainSpec, @@ -175,7 +175,7 @@ impl HotColdDB, LevelDB> { anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), cold_db: LevelDB::open(cold_path)?, - cold_blobs_db: None, + blobs_db: None, hot_db: LevelDB::open(hot_path)?, block_cache: Mutex::new(LruCache::new(config.block_cache_size)), blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)), @@ -220,23 +220,29 @@ impl HotColdDB, LevelDB> { ); } - if db.spec.eip4844_fork_epoch.is_some() { - let blob_info = match db.load_blob_info()? { - Some(mut blob_info) => { - if blob_info.blobs_freezer { - cold_blobs_path - .as_ref() - .ok_or(HotColdDBError::MissingPathToBlobsFreezer)?; - } - if let Some(path) = cold_blobs_path { - db.cold_blobs_db = Some(LevelDB::open(path.as_path())?); - blob_info.blobs_freezer = true; - } - Some(blob_info) - } - None => Some(BlobInfo::default()), - }; - *db.blob_info.write() = blob_info; + let blob_info_on_disk = db.load_blob_info()?; + + if let Some(ref blob_info) = blob_info_on_disk { + let prev_blobs_db = blob_info.blobs_db; + if prev_blobs_db { + blobs_db_path + .as_ref() + .ok_or(HotColdDBError::MissingPathToBlobsDatabase)?; + } + } + + if let Some(path) = blobs_db_path { + if db.spec.eip4844_fork_epoch.is_some() { + db.blobs_db = Some(LevelDB::open(path.as_path())?); + db.compare_and_set_blob_info_with_write( + blob_info_on_disk, + Some(BlobInfo { blobs_db: true }), + )?; + info!( + db.log, + "Blobs DB initialized"; + ); + } } // Ensure that the schema version of the on-disk database matches the software. @@ -547,29 +553,6 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - /// Fetch a blobs sidecar from the store. - /// - /// If `slot` is provided then it will be used as a hint as to which database should - /// be checked first. - pub fn get_blobs( - &self, - block_root: &Hash256, - slot: Option, - ) -> Result>, Error> { - if let Some(slot) = slot { - if slot < self.get_split_slot() { - return match self.load_cold_blobs(block_root)? { - Some(blobs) => Ok(Some(blobs)), - None => self.load_hot_blobs(block_root), - }; - } - } - match self.load_hot_blobs(block_root)? { - Some(blobs) => Ok(Some(blobs)), - None => self.load_cold_blobs(block_root), - } - } - pub fn blobs_as_kv_store_ops( &self, key: &Hash256, @@ -910,6 +893,7 @@ impl, Cold: ItemStore> HotColdDB self.hot_db .do_atomically(self.convert_to_kv_batch(batch)?)?; + drop(guard); drop(guard_blob); @@ -1246,35 +1230,22 @@ impl, Cold: ItemStore> HotColdDB }) } - /// Load a blobs sidecar from the hot database. - pub fn load_hot_blobs(&self, block_root: &Hash256) -> Result>, Error> { - // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, - // may want to attempt to use one again - match self - .hot_db - .get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? - { - Some(bytes) => { - let ret = BlobsSidecar::from_ssz_bytes(&bytes)?; - self.blob_cache.lock().put(*block_root, ret.clone()); - Ok(Some(ret)) - } - None => Ok(None), - } - } - - /// Try to load a blobs from the freezer database. - /// - /// Return `None` if no blobs sidecar with `block_root` lies in the freezer. - pub fn load_cold_blobs(&self, block_root: &Hash256) -> Result>, Error> { - let blobs_freezer = if let Some(ref cold_blobs_db) = self.cold_blobs_db { - cold_blobs_db + /// Fetch a blobs sidecar from the store. + pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { + let blobs_db = if let Some(ref blobs_db) = self.blobs_db { + blobs_db } else { &self.cold_db }; - match blobs_freezer.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { - Some(ref blobs_bytes) => Ok(Some(BlobsSidecar::from_ssz_bytes(blobs_bytes)?)), + match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { + Some(ref blobs_bytes) => { + let blobs = BlobsSidecar::from_ssz_bytes(blobs_bytes)?; + // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, + // may want to attempt to use one again + self.blob_cache.lock().put(*block_root, blobs.clone()); + Ok(Some(blobs)) + } None => Ok(None), } } @@ -1985,12 +1956,6 @@ pub fn migrate_database, Cold: ItemStore>( } let mut hot_db_ops: Vec> = Vec::new(); - let mut cold_blobs_db_ops: Vec> = Vec::new(); - - let blobs_freezer = match store.cold_blobs_db { - Some(ref cold_blobs_db) => cold_blobs_db, - None => &store.cold_db, - }; // 1. Copy all of the states between the head and the split slot, from the hot DB // to the cold DB. Delete the execution payloads of these now-finalized blocks. @@ -2034,17 +1999,8 @@ pub fn migrate_database, Cold: ItemStore>( if store.config.prune_payloads { hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root)); } - - // Prepare migration of blobs to freezer. - if let Some(blobs) = store.get_blobs(&block_root, Some(slot))? { - hot_db_ops.push(StoreOp::DeleteBlobs(block_root)); - cold_blobs_db_ops.push(StoreOp::PutBlobs(block_root, Arc::new(blobs))); - } } - // Migrate blobs to freezer. - blobs_freezer.do_atomically(store.convert_to_kv_batch(cold_blobs_db_ops)?)?; - // Warning: Critical section. We have to take care not to put any of the two databases in an // inconsistent state if the OS process dies at any point during the freezing // procedure. @@ -2057,9 +2013,6 @@ pub fn migrate_database, Cold: ItemStore>( // Flush to disk all the states that have just been migrated to the cold store. store.cold_db.sync()?; - if let Some(ref cold_blobs_db) = store.cold_blobs_db { - cold_blobs_db.sync()?; - } { let mut split_guard = store.split.write(); diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index b5de0048f..92117254f 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -124,6 +124,8 @@ impl StoreItem for AnchorInfo { pub struct BlobInfo { /// The slot after which blobs are available (>=). pub oldest_blob_slot: Option, + /// A separate blobs database is in use. + pub blobs_db: bool, } impl StoreItem for BlobInfo { diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 93377c60e..837ad0aef 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -95,6 +95,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true), ) .arg( +<<<<<<< HEAD Arg::with_name("blob-prune-margin-epochs") .long("blob-prune-margin-epochs") .help( @@ -105,8 +106,12 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("0"), Arg::with_name("blobs-freezer-dir") .long("blobs-freezer-dir") +======= + Arg::with_name("blobs-dir") + .long("blobs-dir") +>>>>>>> 43dc3a9a4 (Fix rebase conflicts) .value_name("DIR") - .help("Data directory for the blobs freezer database.") + .help("Data directory for the blobs database.") .takes_value(true), ) .subcommand(migrate_cli_app()) @@ -128,8 +133,8 @@ fn parse_client_config( client_config.freezer_db_path = Some(freezer_dir); } - if let Some(blobs_freezer_dir) = clap_utils::parse_optional(cli_args, "blobs-freezer-dir")? { - client_config.blobs_freezer_db_path = Some(blobs_freezer_dir); + if let Some(blobs_db_dir) = clap_utils::parse_optional(cli_args, "blobs-dir")? { + client_config.blobs_db_path = Some(blobs_db_dir); } let (sprp, sprp_explicit) = get_slots_per_restore_point::(cli_args)?; @@ -153,13 +158,13 @@ pub fn display_db_version( let spec = runtime_context.eth2_config.spec.clone(); let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); - let cold_blobs_path = client_config.get_blobs_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let mut version = CURRENT_SCHEMA_VERSION; HotColdDB::, LevelDB>::open( &hot_path, &cold_path, - &cold_blobs_path, + &blobs_path, |_, from, _| { version = from; Ok(()) @@ -211,12 +216,12 @@ pub fn inspect_db( let spec = runtime_context.eth2_config.spec.clone(); let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); - let cold_blobs_path = client_config.get_blobs_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, - &cold_blobs_path, + &blobs_path, |_, _, _| Ok(()), client_config.store, spec, @@ -267,14 +272,14 @@ pub fn migrate_db( let spec = &runtime_context.eth2_config.spec; let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); - let cold_blobs_path = client_config.get_blobs_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let mut from = CURRENT_SCHEMA_VERSION; let to = migrate_config.to; let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, - &cold_blobs_path, + &blobs_path, |_, db_initial_version, _| { from = db_initial_version; Ok(()) @@ -309,12 +314,12 @@ pub fn prune_payloads( let spec = &runtime_context.eth2_config.spec; let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); - let cold_blobs_path = client_config.get_blobs_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, - &cold_blobs_path, + &blobs_path, |_, _, _| Ok(()), client_config.store, spec.clone(), From dcb5495745014cb5d74cdd8f9763a365ef3f907e Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 17 Jan 2023 20:22:36 +0100 Subject: [PATCH 13/29] Store blobs in correct db for atomic ops --- .../beacon_chain/src/block_verification.rs | 4 +- beacon_node/store/src/garbage_collection.rs | 2 +- beacon_node/store/src/hot_cold_store.rs | 48 ++++++++++++------- 3 files changed, 35 insertions(+), 19 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index fb1d79e96..156c12863 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1380,7 +1380,9 @@ impl ExecutionPendingBlock { StoreOp::PutStateTemporaryFlag(state_root), ] }; - chain.store.do_atomically(state_batch)?; + chain + .store + .do_atomically_and_update_cache(state_batch, None)?; drop(txn_lock); confirmed_state_roots.push(state_root); diff --git a/beacon_node/store/src/garbage_collection.rs b/beacon_node/store/src/garbage_collection.rs index 329133632..2affaad63 100644 --- a/beacon_node/store/src/garbage_collection.rs +++ b/beacon_node/store/src/garbage_collection.rs @@ -31,7 +31,7 @@ where "Garbage collecting {} temporary states", delete_ops.len() / 2 ); - self.do_atomically(delete_ops)?; + self.do_atomically_and_update_cache(delete_ops, None)?; } Ok(()) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 845ee89b5..0aace19d5 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -849,11 +849,14 @@ impl, Cold: ItemStore> HotColdDB Ok(key_value_batch) } - pub fn do_atomically(&self, batch: Vec>) -> Result<(), Error> { + pub fn do_atomically_and_update_cache( + &self, + batch: Vec>, + blobs_batch: Option>>, + ) -> Result<(), Error> { // Update the block cache whilst holding a lock, to ensure that the cache updates atomically // with the database. let mut guard = self.block_cache.lock(); - let mut guard_blob = self.blob_cache.lock(); for op in &batch { match op { @@ -861,9 +864,7 @@ impl, Cold: ItemStore> HotColdDB guard.put(*block_root, (**block).clone()); } - StoreOp::PutBlobs(block_root, blobs) => { - guard_blob.put(*block_root, (**blobs).clone()); - } + StoreOp::PutBlobs(_, _) => (), StoreOp::PutState(_, _) => (), @@ -877,9 +878,7 @@ impl, Cold: ItemStore> HotColdDB guard.pop(block_root); } - StoreOp::DeleteBlobs(block_root) => { - guard_blob.pop(block_root); - } + StoreOp::DeleteBlobs(_) => (), StoreOp::DeleteState(_, _) => (), @@ -891,11 +890,30 @@ impl, Cold: ItemStore> HotColdDB } } + if let Some(blob_ops) = blobs_batch { + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + let mut guard_blob = self.blob_cache.lock(); + + for op in &blob_ops { + match op { + StoreOp::PutBlobs(block_root, blobs) => { + guard_blob.put(*block_root, (**blobs).clone()); + } + + StoreOp::DeleteBlobs(block_root) => { + guard_blob.pop(block_root); + } + + _ => (), + } + } + blobs_db.do_atomically(self.convert_to_kv_batch(blob_ops)?)?; + drop(guard_blob); + } + self.hot_db .do_atomically(self.convert_to_kv_batch(batch)?)?; - drop(guard); - drop(guard_blob); Ok(()) } @@ -1232,11 +1250,7 @@ impl, Cold: ItemStore> HotColdDB /// Fetch a blobs sidecar from the store. pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { - let blobs_db = if let Some(ref blobs_db) = self.blobs_db { - blobs_db - } else { - &self.cold_db - }; + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { Some(ref blobs_bytes) => { @@ -1751,7 +1765,7 @@ impl, Cold: ItemStore> HotColdDB } } let payloads_pruned = ops.len(); - self.do_atomically(ops)?; + self.do_atomically_and_update_cache(ops, None)?; info!( self.log, "Execution payload pruning complete"; @@ -2050,7 +2064,7 @@ pub fn migrate_database, Cold: ItemStore>( } // Delete the states from the hot database if we got this far. - store.do_atomically(hot_db_ops)?; + store.do_atomically_and_update_cache(hot_db_ops, None)?; debug!( store.log, From 22915c2d7e61f48ed3b479072d08856e1d26a559 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 17 Jan 2023 20:37:30 +0100 Subject: [PATCH 14/29] fixup! Store blobs in correct db for atomic ops --- beacon_node/store/src/hot_cold_store.rs | 41 +++++++++++++++---------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 0aace19d5..f47dc57ee 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -890,30 +890,39 @@ impl, Cold: ItemStore> HotColdDB } } - if let Some(blob_ops) = blobs_batch { - let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); - let mut guard_blob = self.blob_cache.lock(); + let guard_blob = match blobs_batch { + Some(blob_ops) => { + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + // Update the blob cache whilst holding a lock, while holding a lock on the block + // cache, to ensure they and their databases all update atomically. + let mut guard_blob = self.blob_cache.lock(); - for op in &blob_ops { - match op { - StoreOp::PutBlobs(block_root, blobs) => { - guard_blob.put(*block_root, (**blobs).clone()); + for op in &blob_ops { + match op { + StoreOp::PutBlobs(block_root, blobs) => { + guard_blob.put(*block_root, (**blobs).clone()); + } + + StoreOp::DeleteBlobs(block_root) => { + guard_blob.pop(block_root); + } + + _ => (), } - - StoreOp::DeleteBlobs(block_root) => { - guard_blob.pop(block_root); - } - - _ => (), } + blobs_db.do_atomically(self.convert_to_kv_batch(blob_ops)?)?; + Some(guard_blob) } - blobs_db.do_atomically(self.convert_to_kv_batch(blob_ops)?)?; - drop(guard_blob); - } + None => None, + }; self.hot_db .do_atomically(self.convert_to_kv_batch(batch)?)?; + drop(guard); + if let Some(guard_blob) = guard_blob { + drop(guard_blob); + } Ok(()) } From 7f91dd803c7067b0fe2c455cfad34890985b2d2a Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 18 Jan 2023 09:04:15 +0100 Subject: [PATCH 15/29] Help user choose blobs db --- beacon_node/client/src/config.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 4ea59a9da..10eeb3a48 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -50,6 +50,11 @@ pub struct Config { /// Path where the freezer database will be located. pub freezer_db_path: Option, /// Path where the blobs database will be located if blobs should be in a separate database. + /// + /// The capacity this location should hold varies with the data availability boundary. It + /// should be able to store < 69 GB when [MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS](types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS) is 4096 + /// epochs of 32 slots (up to 131072 bytes data per blob and up to 4 blobs per block, 88 bytes + /// of [BlobsSidecar](types::BlobsSidecar) metadata per block). pub blobs_db_path: Option, pub log_file: PathBuf, /// If true, the node will use co-ordinated junk for eth1 values. From f8c3e7fc91be2d42dd24e48e4edc2021b7efc1a4 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 18 Jan 2023 09:04:44 +0100 Subject: [PATCH 16/29] Lint fix --- database_manager/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 837ad0aef..d0cb27931 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -164,7 +164,7 @@ pub fn display_db_version( HotColdDB::, LevelDB>::open( &hot_path, &cold_path, - &blobs_path, + blobs_path, |_, from, _| { version = from; Ok(()) @@ -221,7 +221,7 @@ pub fn inspect_db( let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, - &blobs_path, + blobs_path, |_, _, _| Ok(()), client_config.store, spec, @@ -279,7 +279,7 @@ pub fn migrate_db( let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, - &blobs_path, + blobs_path, |_, db_initial_version, _| { from = db_initial_version; Ok(()) @@ -319,7 +319,7 @@ pub fn prune_payloads( let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, - &blobs_path, + blobs_path, |_, _, _| Ok(()), client_config.store, spec.clone(), From f971f3a3a28fc1c7775837355efd358ec2944bdb Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 1 Feb 2023 17:51:01 +0100 Subject: [PATCH 17/29] Fix rebase conflicts --- beacon_node/beacon_chain/src/beacon_chain.rs | 22 ++--- .../beacon_chain/src/block_verification.rs | 2 +- beacon_node/store/src/garbage_collection.rs | 2 +- beacon_node/store/src/hot_cold_store.rs | 83 ++++++++++--------- beacon_node/store/src/lib.rs | 1 + database_manager/src/lib.rs | 7 +- 6 files changed, 60 insertions(+), 57 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0023386a2..c1b4dc6b5 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3021,7 +3021,6 @@ impl BeaconChain { ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); -<<<<<<< HEAD // Only consider blobs if the eip4844 fork is enabled. if let Some(data_availability_boundary) = self.data_availability_boundary() { let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); @@ -3032,7 +3031,7 @@ impl BeaconChain { // margin, or younger (of higher epoch number). if block_epoch >= import_boundary { if let Some(blobs) = blobs { - if blobs.blobs.len() > 0 { + if !blobs.blobs.is_empty() { //FIXME(sean) using this for debugging for now info!( self.log, "Writing blobs to store"; @@ -3041,16 +3040,8 @@ impl BeaconChain { ops.push(StoreOp::PutBlobs(block_root, blobs)); } } -======= - if let Some(blobs) = blobs { - if blobs.blobs.len() > 0 { - //FIXME(sean) using this for debugging for now - info!(self.log, "Writing blobs to store"; "block_root" => ?block_root); - self.store.put_blobs(&block_root, (&*blobs).clone())?; ->>>>>>> 43dc3a9a4 (Fix rebase conflicts) } } - let txn_lock = self.store.hot_db.begin_rw_transaction(); if let Err(e) = self.store.do_atomically(ops) { @@ -3089,6 +3080,17 @@ impl BeaconChain { return Err(e.into()); } + + if let Some(blobs) = blobs? { + if blobs.blobs.len() > 0 { + //FIXME(sean) using this for debugging for now + info!(self.log, "Writing blobs to store"; "block_root" => ?block_root); + // WARNING! Deadlocks if the alternative to a separate blobs db is + // changed from the cold db to the hot db. + self.store.put_blobs(&block_root, (&*blobs).clone())?; + } + }; + drop(txn_lock); // The fork choice write-lock is dropped *after* the on-disk database has been updated. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 156c12863..70eec4ecc 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1382,7 +1382,7 @@ impl ExecutionPendingBlock { }; chain .store - .do_atomically_and_update_cache(state_batch, None)?; + .do_atomically_with_block_and_blobs_cache(state_batch)?; drop(txn_lock); confirmed_state_roots.push(state_root); diff --git a/beacon_node/store/src/garbage_collection.rs b/beacon_node/store/src/garbage_collection.rs index 2affaad63..c70ef8986 100644 --- a/beacon_node/store/src/garbage_collection.rs +++ b/beacon_node/store/src/garbage_collection.rs @@ -31,7 +31,7 @@ where "Garbage collecting {} temporary states", delete_ops.len() / 2 ); - self.do_atomically_and_update_cache(delete_ops, None)?; + self.do_atomically_with_block_and_blobs_cache(delete_ops)?; } Ok(()) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index f47dc57ee..eaef8550e 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -544,7 +544,8 @@ impl, Cold: ItemStore> HotColdDB } pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar) -> Result<(), Error> { - self.hot_db.put_bytes( + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + blobs_db.put_bytes( DBColumn::BeaconBlob.into(), block_root.as_bytes(), &blobs.as_ssz_bytes(), @@ -849,19 +850,38 @@ impl, Cold: ItemStore> HotColdDB Ok(key_value_batch) } - pub fn do_atomically_and_update_cache( + pub fn do_atomically_with_block_and_blobs_cache( &self, batch: Vec>, - blobs_batch: Option>>, ) -> Result<(), Error> { - // Update the block cache whilst holding a lock, to ensure that the cache updates atomically - // with the database. - let mut guard = self.block_cache.lock(); + let mut hot_db_cache_ops = Vec::new(); - for op in &batch { + let (blobs_ops, hot_db_ops) = batch.into_iter().partition(|store_op| match store_op { + StoreOp::PutBlobs(_, _) | StoreOp::DeleteBlobs(_) => true, + StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => { + hot_db_cache_ops.push(store_op.clone()); + false + } + _ => false, + }); + + // Update database whilst holding a lock on cache, to ensure that the cache updates + // atomically with the database. + let mut guard = self.block_cache.lock(); + let mut guard_blob = self.blob_cache.lock(); + + self.hot_db + .do_atomically(self.convert_to_kv_batch(hot_db_ops)?)?; + + let blob_cache_ops = blobs_ops.clone(); + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + // todo(emhane): do we want to restore the hot db writes if this fails? + blobs_db.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?; + + for op in hot_db_cache_ops { match op { StoreOp::PutBlock(block_root, block) => { - guard.put(*block_root, (**block).clone()); + guard.put(block_root, (*block).clone()); } StoreOp::PutBlobs(_, _) => (), @@ -875,7 +895,7 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteStateTemporaryFlag(_) => (), StoreOp::DeleteBlock(block_root) => { - guard.pop(block_root); + guard.pop(&block_root); } StoreOp::DeleteBlobs(_) => (), @@ -890,39 +910,22 @@ impl, Cold: ItemStore> HotColdDB } } - let guard_blob = match blobs_batch { - Some(blob_ops) => { - let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); - // Update the blob cache whilst holding a lock, while holding a lock on the block - // cache, to ensure they and their databases all update atomically. - let mut guard_blob = self.blob_cache.lock(); - - for op in &blob_ops { - match op { - StoreOp::PutBlobs(block_root, blobs) => { - guard_blob.put(*block_root, (**blobs).clone()); - } - - StoreOp::DeleteBlobs(block_root) => { - guard_blob.pop(block_root); - } - - _ => (), - } + for op in blob_cache_ops { + match op { + StoreOp::PutBlobs(block_root, blobs) => { + guard_blob.put(block_root, (*blobs).clone()); } - blobs_db.do_atomically(self.convert_to_kv_batch(blob_ops)?)?; - Some(guard_blob) - } - None => None, - }; - self.hot_db - .do_atomically(self.convert_to_kv_batch(batch)?)?; + StoreOp::DeleteBlobs(block_root) => { + guard_blob.pop(&block_root); + } + + _ => (), + } + } drop(guard); - if let Some(guard_blob) = guard_blob { - drop(guard_blob); - } + drop(guard_blob); Ok(()) } @@ -1774,7 +1777,7 @@ impl, Cold: ItemStore> HotColdDB } } let payloads_pruned = ops.len(); - self.do_atomically_and_update_cache(ops, None)?; + self.do_atomically_with_block_and_blobs_cache(ops)?; info!( self.log, "Execution payload pruning complete"; @@ -2073,7 +2076,7 @@ pub fn migrate_database, Cold: ItemStore>( } // Delete the states from the hot database if we got this far. - store.do_atomically_and_update_cache(hot_db_ops, None)?; + store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?; debug!( store.log, diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 1d7e92b80..9305b3da0 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -154,6 +154,7 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati /// Reified key-value storage operation. Helps in modifying the storage atomically. /// See also https://github.com/sigp/lighthouse/issues/692 +#[derive(Clone)] pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index d0cb27931..d9b480836 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -95,7 +95,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true), ) .arg( -<<<<<<< HEAD Arg::with_name("blob-prune-margin-epochs") .long("blob-prune-margin-epochs") .help( @@ -104,12 +103,10 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { ) .takes_value(true) .default_value("0"), - Arg::with_name("blobs-freezer-dir") - .long("blobs-freezer-dir") -======= + ) + .arg( Arg::with_name("blobs-dir") .long("blobs-dir") ->>>>>>> 43dc3a9a4 (Fix rebase conflicts) .value_name("DIR") .help("Data directory for the blobs database.") .takes_value(true), From d8e501d3abd45532ca8bfd1429ae4c4843bb6027 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 18 Jan 2023 14:33:45 +0100 Subject: [PATCH 18/29] Add todos --- beacon_node/store/src/hot_cold_store.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index eaef8550e..6226f6f03 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -59,7 +59,7 @@ pub struct HotColdDB, Cold: ItemStore> { pub(crate) config: StoreConfig, /// Cold database containing compact historical data. pub cold_db: Cold, - /// Database containing blobs. + /// Database containing blobs. If None, store falls back to use `cold_db`. pub blobs_db: Option, /// Hot database containing duplicated but quick-to-access recent data. /// @@ -541,6 +541,7 @@ impl, Cold: ItemStore> HotColdDB .key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())?; self.hot_db .key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes()) + // todo(emhane): do we want to delete the corresponding blobs here too? } pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar) -> Result<(), Error> { From 00ce8d9572d585d6687f7ff0d41b93e3d6f49be5 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 31 Jan 2023 21:24:55 +0100 Subject: [PATCH 19/29] Throw error when params don't match with previous run --- beacon_node/store/src/hot_cold_store.rs | 49 +++++++++++++------------ 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 6226f6f03..554ba6261 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -95,13 +95,13 @@ pub enum HotColdDBError { MissingRestorePoint(Hash256), MissingColdStateSummary(Hash256), MissingHotStateSummary(Hash256), - MissingColdBlobs(Hash256), MissingEpochBoundaryState(Hash256), MissingSplitState(Hash256, Slot), MissingExecutionPayload(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingAnchorInfo, MissingPathToBlobsDatabase, + BlobsPreviouslyInDefaultStore, HotStateSummaryError(BeaconStateError), RestorePointDecodeError(ssz::DecodeError), BlockReplayBeaconError(BeaconStateError), @@ -220,30 +220,33 @@ impl HotColdDB, LevelDB> { ); } - let blob_info_on_disk = db.load_blob_info()?; - - if let Some(ref blob_info) = blob_info_on_disk { - let prev_blobs_db = blob_info.blobs_db; - if prev_blobs_db { - blobs_db_path - .as_ref() - .ok_or(HotColdDBError::MissingPathToBlobsDatabase)?; + let blob_info = db.load_blob_info()?; + let (open_blobs_db, path) = match (&blob_info, blobs_db_path) { + (Some(blob_info), Some(path)) => { + if blob_info.blobs_db { + (true, path) + } else { + return Err(HotColdDBError::BlobsPreviouslyInDefaultStore.into()); + } } - } + (None, Some(path)) => (true, path), + (Some(_), None) => return Err(HotColdDBError::MissingPathToBlobsDatabase.into()), + (None, None) => (false, cold_path.to_path_buf()), + }; - if let Some(path) = blobs_db_path { - if db.spec.eip4844_fork_epoch.is_some() { - db.blobs_db = Some(LevelDB::open(path.as_path())?); - db.compare_and_set_blob_info_with_write( - blob_info_on_disk, - Some(BlobInfo { blobs_db: true }), - )?; - info!( - db.log, - "Blobs DB initialized"; - ); - } - } + let new_blob_info = if open_blobs_db { + db.blobs_db = Some(LevelDB::open(path.as_path())?); + Some(BlobInfo { blobs_db: true }) + } else { + Some(BlobInfo { blobs_db: false }) + }; + + db.compare_and_set_blob_info_with_write(blob_info, new_blob_info)?; + info!( + db.log, + "Blobs DB initialized"; + "path" => ?path + ); // Ensure that the schema version of the on-disk database matches the software. // If the version is mismatched, an automatic migration will be attempted. From 04fafebfa6343ef491bd7507c26846b64ca5201e Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 31 Jan 2023 21:48:57 +0100 Subject: [PATCH 20/29] fixup! Throw error when params don't match with previous run --- beacon_node/store/src/hot_cold_store.rs | 37 +++++++++++++------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 554ba6261..194ac2f01 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -221,32 +221,33 @@ impl HotColdDB, LevelDB> { } let blob_info = db.load_blob_info()?; - let (open_blobs_db, path) = match (&blob_info, blobs_db_path) { - (Some(blob_info), Some(path)) => { + let open_blobs_db = match (&blob_info, &blobs_db_path) { + (Some(blob_info), Some(_)) => { if blob_info.blobs_db { - (true, path) + true } else { return Err(HotColdDBError::BlobsPreviouslyInDefaultStore.into()); } } - (None, Some(path)) => (true, path), + (None, Some(_)) => true, (Some(_), None) => return Err(HotColdDBError::MissingPathToBlobsDatabase.into()), - (None, None) => (false, cold_path.to_path_buf()), + (None, None) => false, }; - let new_blob_info = if open_blobs_db { - db.blobs_db = Some(LevelDB::open(path.as_path())?); - Some(BlobInfo { blobs_db: true }) - } else { - Some(BlobInfo { blobs_db: false }) - }; - - db.compare_and_set_blob_info_with_write(blob_info, new_blob_info)?; - info!( - db.log, - "Blobs DB initialized"; - "path" => ?path - ); + if let Some(path) = blobs_db_path { + let new_blob_info = if open_blobs_db { + db.blobs_db = Some(LevelDB::open(path.as_path())?); + Some(BlobInfo { blobs_db: true }) + } else { + Some(BlobInfo { blobs_db: false }) + }; + db.compare_and_set_blob_info_with_write(blob_info, new_blob_info)?; + info!( + db.log, + "Blobs DB initialized"; + "path" => ?path + ); + } // Ensure that the schema version of the on-disk database matches the software. // If the version is mismatched, an automatic migration will be attempted. From ba882958eda8b95b865b2748015772546aa91da9 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 1 Feb 2023 14:05:18 +0100 Subject: [PATCH 21/29] Delete blobs along with block --- beacon_node/store/src/hot_cold_store.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 194ac2f01..d1e33d51d 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -544,8 +544,9 @@ impl, Cold: ItemStore> HotColdDB self.hot_db .key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())?; self.hot_db - .key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes()) - // todo(emhane): do we want to delete the corresponding blobs here too? + .key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes())?; + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes()) } pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar) -> Result<(), Error> { From 89cccfc397b1e03810ef261f36fc47b0f3f28e56 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Mon, 6 Feb 2023 10:24:21 +0100 Subject: [PATCH 22/29] Fix rebase conflicts --- beacon_node/beacon_chain/src/beacon_chain.rs | 13 +------------ .../beacon_chain/src/validator_pubkey_cache.rs | 2 +- beacon_node/store/src/lib.rs | 1 + 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index c1b4dc6b5..980a31390 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3044,7 +3044,7 @@ impl BeaconChain { } let txn_lock = self.store.hot_db.begin_rw_transaction(); - if let Err(e) = self.store.do_atomically(ops) { + if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) { error!( self.log, "Database write failed!"; @@ -3080,17 +3080,6 @@ impl BeaconChain { return Err(e.into()); } - - if let Some(blobs) = blobs? { - if blobs.blobs.len() > 0 { - //FIXME(sean) using this for debugging for now - info!(self.log, "Writing blobs to store"; "block_root" => ?block_root); - // WARNING! Deadlocks if the alternative to a separate blobs db is - // changed from the cold db to the hot db. - self.store.put_blobs(&block_root, (&*blobs).clone())?; - } - }; - drop(txn_lock); // The fork choice write-lock is dropped *after* the on-disk database has been updated. diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs index 79910df29..85fd106ee 100644 --- a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs +++ b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs @@ -38,7 +38,7 @@ impl ValidatorPubkeyCache { }; let store_ops = cache.import_new_pubkeys(state)?; - store.do_atomically(store_ops)?; + store.do_atomically_with_block_and_blobs_cache(store_ops)?; Ok(cache) } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 9305b3da0..3056c2929 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -101,6 +101,7 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { } #[must_use] +#[derive(Clone)] pub enum KeyValueStoreOp { PutKeyValue(Vec, Vec), DeleteKey(Vec), From 72cd68c0a4fa219b49cc5388c9b4da646690ae74 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 1 Feb 2023 17:38:48 +0100 Subject: [PATCH 23/29] Complete making blocks and blobs db atomic --- beacon_node/store/src/hot_cold_store.rs | 65 +++++++++++++++++++------ 1 file changed, 51 insertions(+), 14 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index d1e33d51d..562a6bc51 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -860,30 +860,67 @@ impl, Cold: ItemStore> HotColdDB &self, batch: Vec>, ) -> Result<(), Error> { - let mut hot_db_cache_ops = Vec::new(); - - let (blobs_ops, hot_db_ops) = batch.into_iter().partition(|store_op| match store_op { - StoreOp::PutBlobs(_, _) | StoreOp::DeleteBlobs(_) => true, - StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => { - hot_db_cache_ops.push(store_op.clone()); - false - } - _ => false, - }); + let mut blobs_to_delete = Vec::new(); + let (blobs_ops, hot_db_ops): (Vec>, Vec>) = + batch.into_iter().partition(|store_op| match store_op { + StoreOp::PutBlobs(_, _) => true, + StoreOp::DeleteBlobs(block_root) => { + match self.get_blobs(block_root) { + Ok(Some(blobs_sidecar)) => { + blobs_to_delete.push(blobs_sidecar); + } + Err(e) => { + error!( + self.log, "Error getting blobs"; + "block_root" => %block_root, + "error" => ?e + ); + } + _ => (), + } + true + } + StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => false, + _ => false, + }); // Update database whilst holding a lock on cache, to ensure that the cache updates // atomically with the database. let mut guard = self.block_cache.lock(); let mut guard_blob = self.blob_cache.lock(); - self.hot_db - .do_atomically(self.convert_to_kv_batch(hot_db_ops)?)?; - let blob_cache_ops = blobs_ops.clone(); let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); - // todo(emhane): do we want to restore the hot db writes if this fails? + // Try to execute blobs store ops. blobs_db.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?; + let hot_db_cache_ops = hot_db_ops.clone(); + // Try to execute hot db store ops. + let tx_res = match self.convert_to_kv_batch(hot_db_ops) { + Ok(kv_store_ops) => self.hot_db.do_atomically(kv_store_ops), + Err(e) => Err(e), + }; + // Rollback on failure + if let Err(e) = tx_res { + let mut rollback_blob_ops: Vec> = Vec::with_capacity(blob_cache_ops.len()); + for blob_op in blob_cache_ops { + match blob_op { + StoreOp::PutBlobs(block_root, _) => { + rollback_blob_ops.push(StoreOp::DeleteBlobs(block_root)); + } + StoreOp::DeleteBlobs(_) => { + if let Some(blobs) = blobs_to_delete.pop() { + rollback_blob_ops + .push(StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs))); + } + } + _ => (), + } + } + blobs_db.do_atomically(self.convert_to_kv_batch(rollback_blob_ops)?)?; + return Err(e); + } + for op in hot_db_cache_ops { match op { StoreOp::PutBlock(block_root, block) => { From ca934b7cb5256c1dff6aabd3f6e055e9fb9cc954 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 1 Feb 2023 17:55:24 +0100 Subject: [PATCH 24/29] Fix rebase conflicts --- beacon_node/http_api/src/block_id.rs | 5 ++++- beacon_node/store/src/hot_cold_store.rs | 10 +++++++--- beacon_node/store/src/metadata.rs | 2 ++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 9e152dc61..e8d463bbe 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -218,7 +218,10 @@ impl BlockId { chain: &BeaconChain, ) -> Result>, warp::Rejection> { let root = self.root(chain)?.0; - match chain.get_blobs(&root) { + let Some(data_availability_boundary) = chain.data_availability_boundary() else { + return Err(warp_utils::reject::custom_not_found("Eip4844 fork disabled".into())); + }; + match chain.get_blobs(&root, data_availability_boundary) { Ok(Some(blob)) => Ok(Arc::new(blob)), Ok(None) => Err(warp_utils::reject::custom_not_found(format!( "Blob with block root {} is not in the store", diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 562a6bc51..201a78afd 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -237,11 +237,15 @@ impl HotColdDB, LevelDB> { if let Some(path) = blobs_db_path { let new_blob_info = if open_blobs_db { db.blobs_db = Some(LevelDB::open(path.as_path())?); - Some(BlobInfo { blobs_db: true }) + let mut new_blob_info = blob_info.clone().unwrap_or_default(); + new_blob_info.blobs_db = true; + new_blob_info } else { - Some(BlobInfo { blobs_db: false }) + let mut new_blob_info = blob_info.clone().unwrap_or_default(); + new_blob_info.blobs_db = false; + new_blob_info }; - db.compare_and_set_blob_info_with_write(blob_info, new_blob_info)?; + db.compare_and_set_blob_info_with_write(blob_info, Some(new_blob_info))?; info!( db.log, "Blobs DB initialized"; diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 92117254f..6c3761eb8 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -126,6 +126,8 @@ pub struct BlobInfo { pub oldest_blob_slot: Option, /// A separate blobs database is in use. pub blobs_db: bool, + /// The slot after which blobs are available (>=). + pub oldest_blob_slot: Option, } impl StoreItem for BlobInfo { From 38fe2dce3fb7196180dc6eb79d6c7df127ef1042 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Mon, 6 Feb 2023 10:18:06 +0100 Subject: [PATCH 25/29] fixup! Complete making blocks and blobs db atomic --- beacon_node/store/src/hot_cold_store.rs | 26 +++++++++++-------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 201a78afd..17c5a81b8 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -123,6 +123,7 @@ pub enum HotColdDBError { request_slot: Option, state_root: Hash256, }, + Rollback, } impl HotColdDB, MemoryStore> { @@ -906,22 +907,17 @@ impl, Cold: ItemStore> HotColdDB }; // Rollback on failure if let Err(e) = tx_res { - let mut rollback_blob_ops: Vec> = Vec::with_capacity(blob_cache_ops.len()); - for blob_op in blob_cache_ops { - match blob_op { - StoreOp::PutBlobs(block_root, _) => { - rollback_blob_ops.push(StoreOp::DeleteBlobs(block_root)); - } - StoreOp::DeleteBlobs(_) => { - if let Some(blobs) = blobs_to_delete.pop() { - rollback_blob_ops - .push(StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs))); - } - } - _ => (), - } + for op in blob_cache_ops.iter_mut() { + let reverse_op = match op { + StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(block_root), + StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() { + Some(blobs) => StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs)), + None => return Err(HotColdDBError::Rollback.into()), + }, + _ => return Err(HotColdDBError::Rollback.into()), + }; } - blobs_db.do_atomically(self.convert_to_kv_batch(rollback_blob_ops)?)?; + blobs_db.do_atomically(self.convert_to_kv_batch(blob_cache_ops)?)?; return Err(e); } From 290e1d2ff71af5ec6a76950805a2fd83a11200c4 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Mon, 6 Feb 2023 10:28:32 +0100 Subject: [PATCH 26/29] fixup! Complete making blocks and blobs db atomic --- beacon_node/store/src/hot_cold_store.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 17c5a81b8..f8d143dcc 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -907,15 +907,17 @@ impl, Cold: ItemStore> HotColdDB }; // Rollback on failure if let Err(e) = tx_res { + let mut blob_cache_ops = blob_cache_ops; for op in blob_cache_ops.iter_mut() { let reverse_op = match op { - StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(block_root), + StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root), StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() { Some(blobs) => StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs)), None => return Err(HotColdDBError::Rollback.into()), }, _ => return Err(HotColdDBError::Rollback.into()), }; + *op = reverse_op; } blobs_db.do_atomically(self.convert_to_kv_batch(blob_cache_ops)?)?; return Err(e); From 1300fb7ffac32fa43b0b4fee34caee47d16ba102 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Thu, 9 Feb 2023 08:20:08 +0100 Subject: [PATCH 27/29] Fix conflicts from rebasing eip4844 --- beacon_node/store/src/hot_cold_store.rs | 3 ++- beacon_node/store/src/metadata.rs | 2 -- database_manager/src/lib.rs | 2 ++ 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index f8d143dcc..0693646fc 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1977,11 +1977,12 @@ impl, Cold: ItemStore> HotColdDB blob_info, BlobInfo { oldest_blob_slot: Some(end_slot + 1), + blobs_db: blob_info.blobs_db, }, )?; ops.push(StoreOp::KeyValueOp(update_blob_info)); - self.do_atomically(ops)?; + self.do_atomically_with_block_and_blobs_cache(ops)?; info!( self.log, "Blobs sidecar pruning complete"; diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 6c3761eb8..92117254f 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -126,8 +126,6 @@ pub struct BlobInfo { pub oldest_blob_slot: Option, /// A separate blobs database is in use. pub blobs_db: bool, - /// The slot after which blobs are available (>=). - pub oldest_blob_slot: Option, } impl StoreItem for BlobInfo { diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index d9b480836..7d5753434 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -337,10 +337,12 @@ pub fn prune_blobs( let spec = &runtime_context.eth2_config.spec; let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, + blobs_path, |_, _, _| Ok(()), client_config.store, spec.clone(), From 12720f9ac5bac98e4f4d12f0f02454f08aff5a1e Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Thu, 9 Feb 2023 10:37:53 +0100 Subject: [PATCH 28/29] fixup! Help user choose blobs db --- beacon_node/store/src/hot_cold_store.rs | 79 ++++++++++++++----------- 1 file changed, 45 insertions(+), 34 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 0693646fc..3128f6596 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -221,38 +221,52 @@ impl HotColdDB, LevelDB> { ); } + // Open separate blobs directory if configured and same configuration was used on previous + // run. let blob_info = db.load_blob_info()?; - let open_blobs_db = match (&blob_info, &blobs_db_path) { - (Some(blob_info), Some(_)) => { - if blob_info.blobs_db { - true - } else { - return Err(HotColdDBError::BlobsPreviouslyInDefaultStore.into()); + let new_blob_info = { + match (&blob_info, &blobs_db_path) { + (Some(blob_info), Some(_)) => { + if !blob_info.blobs_db { + return Err(HotColdDBError::BlobsPreviouslyInDefaultStore.into()); + } + BlobInfo { + oldest_blob_slot: blob_info.oldest_blob_slot, + blobs_db: true, + } } + (Some(blob_info), None) => { + if blob_info.blobs_db { + return Err(HotColdDBError::MissingPathToBlobsDatabase.into()); + } + BlobInfo { + oldest_blob_slot: blob_info.oldest_blob_slot, + blobs_db: false, + } + } + (None, Some(_)) => BlobInfo { + oldest_blob_slot: None, + blobs_db: true, + }, // first time starting up node + (None, None) => BlobInfo { + oldest_blob_slot: None, + blobs_db: false, + }, // first time starting up node } - (None, Some(_)) => true, - (Some(_), None) => return Err(HotColdDBError::MissingPathToBlobsDatabase.into()), - (None, None) => false, }; - - if let Some(path) = blobs_db_path { - let new_blob_info = if open_blobs_db { + if new_blob_info.blobs_db { + if let Some(path) = &blobs_db_path { db.blobs_db = Some(LevelDB::open(path.as_path())?); - let mut new_blob_info = blob_info.clone().unwrap_or_default(); - new_blob_info.blobs_db = true; - new_blob_info - } else { - let mut new_blob_info = blob_info.clone().unwrap_or_default(); - new_blob_info.blobs_db = false; - new_blob_info - }; - db.compare_and_set_blob_info_with_write(blob_info, Some(new_blob_info))?; - info!( - db.log, - "Blobs DB initialized"; - "path" => ?path - ); + } } + let blob_info = blob_info.unwrap_or(db.get_blob_info()); + db.compare_and_set_blob_info_with_write(blob_info, new_blob_info)?; + info!( + db.log, + "Blobs DB initialized"; + "use separate blobs db" => db.get_blob_info().blobs_db, + "path" => ?blobs_db_path + ); // Ensure that the schema version of the on-disk database matches the software. // If the version is mismatched, an automatic migration will be attempted. @@ -1972,14 +1986,11 @@ impl, Cold: ItemStore> HotColdDB } } let blobs_sidecars_pruned = ops.len(); - - let update_blob_info = self.compare_and_set_blob_info( - blob_info, - BlobInfo { - oldest_blob_slot: Some(end_slot + 1), - blobs_db: blob_info.blobs_db, - }, - )?; + let new_blob_info = BlobInfo { + oldest_blob_slot: Some(end_slot + 1), + blobs_db: blob_info.blobs_db, + }; + let update_blob_info = self.compare_and_set_blob_info(blob_info, new_blob_info)?; ops.push(StoreOp::KeyValueOp(update_blob_info)); self.do_atomically_with_block_and_blobs_cache(ops)?; From a68e3eac2c15189574742eddeb11d4c7ea2a92a2 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 10 Feb 2023 08:25:42 -0500 Subject: [PATCH 29/29] pr feedback --- beacon_node/execution_layer/src/lib.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index d60082b91..bbf45b183 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -14,6 +14,7 @@ pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc}; use engines::{Engine, EngineError}; pub use engines::{EngineState, ForkchoiceState}; use eth2::types::{builder_bid::SignedBuilderBid, ForkVersionedResponse}; +use ethers_core::abi::ethereum_types::FromStrRadixErr; use ethers_core::types::transaction::eip2930::AccessListItem; use ethers_core::types::{Transaction as EthersTransaction, U64}; use fork_choice::ForkchoiceUpdateParameters; @@ -2042,9 +2043,9 @@ pub enum BlobTxConversionError { /// There was an error converting the transaction from JSON. SerdeJson(serde_json::Error), /// There was an error converting the transaction from hex. - FromHexError(String), - /// The `max_fee_per_data_gas` field did not contains 32 bytes. - InvalidDataGasBytesLen, + FromHex(String), + /// There was an error converting the transaction from hex. + FromStrRadix(FromStrRadixErr), /// A `versioned_hash` did not contain 32 bytes. InvalidVersionedHashBytesLen, } @@ -2150,19 +2151,15 @@ fn ethers_tx_to_bytes( // `ethers-rs` does not yet support SSZ and therefore the blobs transaction type. // maxFeePerDataGas - let data_gas_bytes = eth2_serde_utils::hex::decode( + let max_fee_per_data_gas = Uint256::from_str_radix( other .get("maxFeePerDataGas") .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)? .as_str() .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)?, + 16, ) - .map_err(BlobTxConversionError::FromHexError)?; - let max_fee_per_data_gas = if data_gas_bytes.len() != Uint256::ssz_fixed_len() { - Err(BlobTxConversionError::InvalidDataGasBytesLen) - } else { - Ok(Uint256::from_big_endian(&data_gas_bytes)) - }?; + .map_err(BlobTxConversionError::FromStrRadix)?; // blobVersionedHashes let blob_versioned_hashes = other @@ -2177,7 +2174,7 @@ fn ethers_tx_to_bytes( .as_str() .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)?, ) - .map_err(BlobTxConversionError::FromHexError)?; + .map_err(BlobTxConversionError::FromHex)?; if hash_bytes.len() != Hash256::ssz_fixed_len() { Err(BlobTxConversionError::InvalidVersionedHashBytesLen) } else {