diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 19159d6fc..ff2992e0b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -958,7 +958,9 @@ impl BeaconChain { block_root: &Hash256, ) -> Result>, Error> { // If there is no data availability boundary, the Eip4844 fork is disabled. - if self.finalized_data_availability_boundary().is_some() { + if let Some(finalized_data_availability_boundary) = + self.finalized_data_availability_boundary() + { // Only use the attester cache if we can find both the block and blob if let (Some(block), Some(blobs)) = ( self.early_attester_cache.get_block(*block_root), @@ -970,7 +972,9 @@ impl BeaconChain { })) // Attempt to get the block and blobs from the database } else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) { - let blobs = self.get_blobs(block_root)?.map(Arc::new); + let blobs = self + .get_blobs(block_root, finalized_data_availability_boundary)? + .map(Arc::new); Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar { beacon_block: block, blobs_sidecar: blobs, @@ -1066,27 +1070,32 @@ impl BeaconChain { pub fn get_blobs( &self, block_root: &Hash256, + data_availability_boundary: Epoch, ) -> Result>, Error> { match self.store.get_blobs(block_root)? { Some(blobs) => Ok(Some(blobs)), None => { - if let Ok(Some(block)) = self.get_blinded_block(block_root) { - let expected_kzg_commitments = block.message().body().blob_kzg_commitments()?; - - if !expected_kzg_commitments.is_empty() { - Err(Error::DBInconsistent(format!( - "Expected kzg commitments but no blobs stored for block root {}", - block_root - ))) - } else { - Ok(Some(BlobsSidecar::empty_from_parts( - *block_root, - block.slot(), - ))) - } - } else { - Ok(None) - } + // Check for the corresponding block to understand whether we *should* have blobs. + self.get_blinded_block(block_root)? + .map(|block| { + // If there are no KZG commitments in the block, we know the sidecar should + // be empty. + let expected_kzg_commitments = + match block.message().body().blob_kzg_commitments() { + Ok(kzg_commitments) => kzg_commitments, + Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock), + }; + if expected_kzg_commitments.is_empty() { + Ok(BlobsSidecar::empty_from_parts(*block_root, block.slot())) + } else if data_availability_boundary <= block.epoch() { + // We should have blobs for all blocks younger than the boundary. + Err(Error::BlobsUnavailable) + } else { + // We shouldn't have blobs for blocks older than the boundary. + Err(Error::BlobsOlderThanDataAvailabilityBoundary(block.epoch())) + } + }) + .transpose() } } } @@ -3033,10 +3042,9 @@ impl BeaconChain { } } } - let txn_lock = self.store.hot_db.begin_rw_transaction(); - if let Err(e) = self.store.do_atomically(ops) { + if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) { error!( self.log, "Database write failed!"; @@ -4641,7 +4649,7 @@ impl BeaconChain { debug!( self.log, "Produced block on state"; - "block_size" => %block_size, + "block_size" => block_size, ); metrics::observe(&metrics::BLOCK_SIZE, block_size as f64); diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 8ae216d49..cd178317f 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1380,7 +1380,9 @@ impl ExecutionPendingBlock { StoreOp::PutStateTemporaryFlag(state_root), ] }; - chain.store.do_atomically(state_batch)?; + chain + .store + .do_atomically_with_block_and_blobs_cache(state_batch)?; drop(txn_lock); confirmed_state_roots.push(state_root); diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs index ccbfd027b..c36b9059b 100644 --- a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs +++ b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs @@ -38,7 +38,7 @@ impl ValidatorPubkeyCache { }; let store_ops = cache.import_new_pubkeys(state)?; - store.do_atomically(store_ops)?; + store.do_atomically_with_block_and_blobs_cache(store_ops)?; Ok(cache) } @@ -299,7 +299,7 @@ mod test { let ops = cache .import_new_pubkeys(&state) .expect("should import pubkeys"); - store.do_atomically(ops).unwrap(); + store.do_atomically_with_block_and_blobs_cache(ops).unwrap(); check_cache_get(&cache, &keypairs[..]); drop(cache); diff --git a/beacon_node/beacon_chain/tests/op_verification.rs b/beacon_node/beacon_chain/tests/op_verification.rs index a7e5b90f3..ef62e292b 100644 --- a/beacon_node/beacon_chain/tests/op_verification.rs +++ b/beacon_node/beacon_chain/tests/op_verification.rs @@ -32,8 +32,16 @@ fn get_store(db_path: &TempDir) -> Arc { let cold_path = db_path.path().join("cold_db"); let config = StoreConfig::default(); let log = NullLoggerBuilder.build().expect("logger should build"); - HotColdDB::open(&hot_path, &cold_path, |_, _, _| Ok(()), config, spec, log) - .expect("disk store should initialize") + HotColdDB::open( + &hot_path, + &cold_path, + None, + |_, _, _| Ok(()), + config, + spec, + log, + ) + .expect("disk store should initialize") } fn get_harness(store: Arc, validator_count: usize) -> TestHarness { diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 6345c9bf9..5b55c99d9 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -60,8 +60,16 @@ fn get_store_with_spec( let config = StoreConfig::default(); let log = test_logger(); - HotColdDB::open(&hot_path, &cold_path, |_, _, _| Ok(()), config, spec, log) - .expect("disk store should initialize") + HotColdDB::open( + &hot_path, + &cold_path, + None, + |_, _, _| Ok(()), + config, + spec, + log, + ) + .expect("disk store should initialize") } fn get_harness( diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 23c4977b7..e80b6fd18 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -68,6 +68,7 @@ pub struct ClientBuilder { gossipsub_registry: Option, db_path: Option, freezer_db_path: Option, + blobs_db_path: Option, http_api_config: http_api::Config, http_metrics_config: http_metrics::Config, slasher: Option>>, @@ -100,6 +101,7 @@ where gossipsub_registry: None, db_path: None, freezer_db_path: None, + blobs_db_path: None, http_api_config: <_>::default(), http_metrics_config: <_>::default(), slasher: None, @@ -892,6 +894,7 @@ where mut self, hot_path: &Path, cold_path: &Path, + blobs_path: Option, config: StoreConfig, log: Logger, ) -> Result { @@ -907,6 +910,7 @@ where self.db_path = Some(hot_path.into()); self.freezer_db_path = Some(cold_path.into()); + self.blobs_db_path = blobs_path.clone(); let inner_spec = spec.clone(); let deposit_contract_deploy_block = context @@ -929,6 +933,7 @@ where let store = HotColdDB::open( hot_path, cold_path, + blobs_path, schema_upgrade, config, spec, diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 6c3a98a46..10eeb3a48 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -49,6 +49,13 @@ pub struct Config { pub db_name: String, /// Path where the freezer database will be located. pub freezer_db_path: Option, + /// Path where the blobs database will be located if blobs should be in a separate database. + /// + /// The capacity this location should hold varies with the data availability boundary. It + /// should be able to store < 69 GB when [MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS](types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS) is 4096 + /// epochs of 32 slots (up to 131072 bytes data per blob and up to 4 blobs per block, 88 bytes + /// of [BlobsSidecar](types::BlobsSidecar) metadata per block). + pub blobs_db_path: Option, pub log_file: PathBuf, /// If true, the node will use co-ordinated junk for eth1 values. /// @@ -89,6 +96,7 @@ impl Default for Config { data_dir: PathBuf::from(DEFAULT_ROOT_DIR), db_name: "chain_db".to_string(), freezer_db_path: None, + blobs_db_path: None, log_file: PathBuf::from(""), genesis: <_>::default(), store: <_>::default(), @@ -149,11 +157,27 @@ impl Config { .unwrap_or_else(|| self.default_freezer_db_path()) } + /// Returns the path to which the client may initialize the on-disk blobs database. + /// + /// Will attempt to use the user-supplied path from e.g. the CLI, or will default + /// to None. + pub fn get_blobs_db_path(&self) -> Option { + self.blobs_db_path.clone() + } + /// Get the freezer DB path, creating it if necessary. pub fn create_freezer_db_path(&self) -> Result { ensure_dir_exists(self.get_freezer_db_path()) } + /// Get the blobs DB path, creating it if necessary. + pub fn create_blobs_db_path(&self) -> Result, String> { + match self.get_blobs_db_path() { + Some(blobs_db_path) => Ok(Some(ensure_dir_exists(blobs_db_path)?)), + None => Ok(None), + } + } + /// Returns the "modern" path to the data_dir. /// /// See `Self::get_data_dir` documentation for more info. diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 480453552..2018059cb 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -14,7 +14,9 @@ pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc}; use engines::{Engine, EngineError}; pub use engines::{EngineState, ForkchoiceState}; use eth2::types::{builder_bid::SignedBuilderBid, ForkVersionedResponse}; -use ethers_core::types::Transaction as EthersTransaction; +use ethers_core::abi::ethereum_types::FromStrRadixErr; +use ethers_core::types::transaction::eip2930::AccessListItem; +use ethers_core::types::{Transaction as EthersTransaction, U64}; use fork_choice::ForkchoiceUpdateParameters; use lru::LruCache; use payload_status::process_payload_status; @@ -39,12 +41,12 @@ use tokio::{ }; use tokio_stream::wrappers::WatchStream; use types::consts::eip4844::BLOB_TX_TYPE; -use types::transaction::{AccessTuple, BlobTransaction}; +use types::transaction::{AccessTuple, BlobTransaction, EcdsaSignature, SignedBlobTransaction}; use types::{ blobs_sidecar::{Blobs, KzgCommitments}, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge, }; -use types::{AbstractExecPayload, BeaconStateError, ExecPayload}; +use types::{AbstractExecPayload, BeaconStateError, ExecPayload, VersionedHash}; use types::{ BlindedPayload, BlockType, ChainSpec, Epoch, ExecutionBlockHash, ForkName, ProposerPreparationData, PublicKeyBytes, Signature, SignedBeaconBlock, Slot, Transaction, @@ -137,16 +139,6 @@ pub enum BlockProposalContents> { }, } -pub struct BlockProposalBlobsContents { - pub kzg_commitments: KzgCommitments, - pub blobs: Blobs, -} - -pub struct BlockProposalContentsDeconstructed> { - pub payload: Payload, - pub blobs_content: Option>, -} - impl> BlockProposalContents { pub fn deconstruct(self) -> (Payload, Option>, Option>) { match self { @@ -1692,13 +1684,15 @@ impl ExecutionLayer { return Ok(None); }; - let transactions = VariableList::from( - block - .transactions() - .iter() - .map(ethers_tx_to_bytes::) - .collect::, BlobTxConversionError>>()?, - ); + let convert_transactions = |transactions: Vec| { + VariableList::new( + transactions + .into_iter() + .map(ethers_tx_to_bytes::) + .collect::, BlobTxConversionError>>()?, + ) + .map_err(BlobTxConversionError::SszError) + }; let payload = match block { ExecutionBlockWithTransactions::Merge(merge_block) => { @@ -1716,7 +1710,7 @@ impl ExecutionLayer { extra_data: merge_block.extra_data, base_fee_per_gas: merge_block.base_fee_per_gas, block_hash: merge_block.block_hash, - transactions, + transactions: convert_transactions(merge_block.transactions)?, }) } ExecutionBlockWithTransactions::Capella(capella_block) => { @@ -1742,7 +1736,7 @@ impl ExecutionLayer { extra_data: capella_block.extra_data, base_fee_per_gas: capella_block.base_fee_per_gas, block_hash: capella_block.block_hash, - transactions, + transactions: convert_transactions(capella_block.transactions)?, withdrawals, }) } @@ -1770,7 +1764,7 @@ impl ExecutionLayer { base_fee_per_gas: eip4844_block.base_fee_per_gas, excess_data_gas: eip4844_block.excess_data_gas, block_hash: eip4844_block.block_hash, - transactions, + transactions: convert_transactions(eip4844_block.transactions)?, withdrawals, }) } @@ -2035,10 +2029,18 @@ pub enum BlobTxConversionError { MaxFeePerDataGasMissing, /// Missing the `blob_versioned_hashes` field. BlobVersionedHashesMissing, + /// `y_parity` field was greater than one. + InvalidYParity, /// There was an error converting the transaction to SSZ. SszError(ssz_types::Error), /// There was an error converting the transaction from JSON. SerdeJson(serde_json::Error), + /// There was an error converting the transaction from hex. + FromHex(String), + /// There was an error converting the transaction from hex. + FromStrRadix(FromStrRadixErr), + /// A `versioned_hash` did not contain 32 bytes. + InvalidVersionedHashBytesLen, } impl From for BlobTxConversionError { @@ -2057,67 +2059,123 @@ impl From for BlobTxConversionError { /// on transaction type. That means RLP encoding if this is a transaction other than a /// `BLOB_TX_TYPE` transaction in which case, SSZ encoding will be used. fn ethers_tx_to_bytes( - transaction: &EthersTransaction, + transaction: EthersTransaction, ) -> Result, BlobTxConversionError> { let tx_type = transaction .transaction_type .ok_or(BlobTxConversionError::NoTransactionType)? .as_u64(); + let tx = if BLOB_TX_TYPE as u64 == tx_type { - let chain_id = transaction - .chain_id - .ok_or(BlobTxConversionError::NoChainId)?; - let nonce = if transaction.nonce > Uint256::from(u64::MAX) { + let EthersTransaction { + hash: _, + nonce, + block_hash: _, + block_number: _, + transaction_index: _, + from: _, + to, + value, + gas_price: _, + gas, + input, + v, + r, + s, + transaction_type: _, + access_list, + max_priority_fee_per_gas, + max_fee_per_gas, + chain_id, + other, + } = transaction; + + // ******************** BlobTransaction fields ******************** + + // chainId + let chain_id = chain_id.ok_or(BlobTxConversionError::NoChainId)?; + + // nonce + let nonce = if nonce > Uint256::from(u64::MAX) { return Err(BlobTxConversionError::NonceTooLarge); } else { - transaction.nonce.as_u64() + nonce.as_u64() }; - let max_priority_fee_per_gas = transaction - .max_priority_fee_per_gas - .ok_or(BlobTxConversionError::MaxPriorityFeePerGasMissing)?; - let max_fee_per_gas = transaction - .max_fee_per_gas - .ok_or(BlobTxConversionError::MaxFeePerGasMissing)?; - let gas = if transaction.gas > Uint256::from(u64::MAX) { + + // maxPriorityFeePerGas + let max_priority_fee_per_gas = + max_priority_fee_per_gas.ok_or(BlobTxConversionError::MaxPriorityFeePerGasMissing)?; + + // maxFeePerGas + let max_fee_per_gas = max_fee_per_gas.ok_or(BlobTxConversionError::MaxFeePerGasMissing)?; + + // gas + let gas = if gas > Uint256::from(u64::MAX) { return Err(BlobTxConversionError::GasTooHigh); } else { - transaction.gas.as_u64() + gas.as_u64() }; - let to = transaction.to; - let value = transaction.value; - let data = VariableList::new(transaction.input.to_vec())?; + + // data (a.k.a input) + let data = VariableList::new(input.to_vec())?; + + // accessList let access_list = VariableList::new( - transaction - .access_list - .as_ref() + access_list .ok_or(BlobTxConversionError::AccessListMissing)? .0 - .iter() + .into_iter() .map(|access_tuple| { + let AccessListItem { + address, + storage_keys, + } = access_tuple; Ok(AccessTuple { - address: access_tuple.address, - storage_keys: VariableList::new(access_tuple.storage_keys.clone())?, + address, + storage_keys: VariableList::new(storage_keys)?, }) }) .collect::, BlobTxConversionError>>()?, )?; - let max_fee_per_data_gas = transaction - .other - .get("maxFeePerDataGas") - .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)? - .as_str() - .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)? - .parse() - .map_err(|_| BlobTxConversionError::MaxFeePerDataGasMissing)?; - let blob_versioned_hashes = serde_json::from_str( - transaction - .other - .get("blobVersionedHashes") - .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)? + + // ******************** BlobTransaction `other` fields ******************** + // + // Here we use the `other` field in the `ethers-rs` `Transaction` type because + // `ethers-rs` does not yet support SSZ and therefore the blobs transaction type. + + // maxFeePerDataGas + let max_fee_per_data_gas = Uint256::from_str_radix( + other + .get("maxFeePerDataGas") + .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)? .as_str() - .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)?, - )?; - BlobTransaction { + .ok_or(BlobTxConversionError::MaxFeePerDataGasMissing)?, + 16, + ) + .map_err(BlobTxConversionError::FromStrRadix)?; + + // blobVersionedHashes + let blob_versioned_hashes = other + .get("blobVersionedHashes") + .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)? + .as_array() + .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)? + .iter() + .map(|versioned_hash| { + let hash_bytes = eth2_serde_utils::hex::decode( + versioned_hash + .as_str() + .ok_or(BlobTxConversionError::BlobVersionedHashesMissing)?, + ) + .map_err(BlobTxConversionError::FromHex)?; + if hash_bytes.len() != Hash256::ssz_fixed_len() { + Err(BlobTxConversionError::InvalidVersionedHashBytesLen) + } else { + Ok(Hash256::from_slice(&hash_bytes)) + } + }) + .collect::, BlobTxConversionError>>()?; + let message = BlobTransaction { chain_id, nonce, max_priority_fee_per_gas, @@ -2128,9 +2186,24 @@ fn ethers_tx_to_bytes( data, access_list, max_fee_per_data_gas, - blob_versioned_hashes, - } - .as_ssz_bytes() + blob_versioned_hashes: VariableList::new(blob_versioned_hashes)?, + }; + + // ******************** EcdsaSignature fields ******************** + + let y_parity = if v == U64::zero() { + false + } else if v == U64::one() { + true + } else { + return Err(BlobTxConversionError::InvalidYParity); + }; + let signature = EcdsaSignature { y_parity, r, s }; + + // The `BLOB_TX_TYPE` should prepend the SSZ encoded `SignedBlobTransaction`. + let mut signed_tx = SignedBlobTransaction { message, signature }.as_ssz_bytes(); + signed_tx.insert(0, BLOB_TX_TYPE); + signed_tx } else { transaction.rlp().to_vec() }; diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 45c7bed1f..b484f4079 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -218,13 +218,16 @@ impl BlockId { chain: &BeaconChain, ) -> Result>, warp::Rejection> { let root = self.root(chain)?.0; - match chain.store.get_blobs(&root) { + let Some(data_availability_boundary) = chain.data_availability_boundary() else { + return Err(warp_utils::reject::custom_not_found("Eip4844 fork disabled".into())); + }; + match chain.get_blobs(&root, data_availability_boundary) { Ok(Some(blob)) => Ok(Arc::new(blob)), Ok(None) => Err(warp_utils::reject::custom_not_found(format!( "Blob with block root {} is not in the store", root ))), - Err(e) => Err(warp_utils::reject::beacon_chain_error(e.into())), + Err(e) => Err(warp_utils::reject::beacon_chain_error(e)), } } } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index c00e8914d..ea48251d6 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -408,15 +408,12 @@ impl ProtocolId { /// Returns `true` if the given `ProtocolId` should expect `context_bytes` in the /// beginning of the stream, else returns `false`. pub fn has_context_bytes(&self) -> bool { - match self.version { - Version::V2 => matches!( - self.message_name, - Protocol::BlocksByRange | Protocol::BlocksByRoot - ), - Version::V1 => matches!( - self.message_name, - Protocol::BlobsByRange | Protocol::BlobsByRoot - ), + match self.message_name { + Protocol::BlocksByRange | Protocol::BlocksByRoot => { + !matches!(self.version, Version::V1) + } + Protocol::BlobsByRange | Protocol::BlobsByRoot | Protocol::LightClientBootstrap => true, + Protocol::Goodbye | Protocol::Ping | Protocol::Status | Protocol::MetaData => false, } } } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 38f558a4e..01b7cb43b 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -799,7 +799,7 @@ impl Worker { let mut send_response = true; for root in block_roots { - match self.chain.get_blobs(&root) { + match self.chain.get_blobs(&root, data_availability_boundary) { Ok(Some(blobs)) => { blobs_sent += 1; self.send_network_message(NetworkMessage::SendResponse { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 72c2db921..ef8f872cb 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -560,14 +560,16 @@ impl SyncNetworkContext { /// Check whether a batch for this epoch (and only this epoch) should request just blocks or /// blocks and blobs. - pub fn batch_type(&self, _epoch: types::Epoch) -> ByRangeRequestType { - if super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH * super::range_sync::EPOCHS_PER_BATCH - != 1 - { - panic!( - "To deal with alignment with 4844 boundaries, batches need to be of just one epoch" - ); - } + #[allow(unused)] + pub fn batch_type(&self, epoch: types::Epoch) -> ByRangeRequestType { + // Induces a compile time panic if this doesn't hold true. + #[allow(clippy::assertions_on_constants)] + const _: () = assert!( + super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1 + && super::range_sync::EPOCHS_PER_BATCH == 1, + "To deal with alignment with 4844 boundaries, batches need to be of just one epoch" + ); + #[cfg(test)] { // Keep tests only for blocks. @@ -576,7 +578,7 @@ impl SyncNetworkContext { #[cfg(not(test))] { if let Some(data_availability_boundary) = self.chain.data_availability_boundary() { - if _epoch >= data_availability_boundary { + if epoch >= data_availability_boundary { ByRangeRequestType::BlocksAndBlobs } else { ByRangeRequestType::Blocks diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index e711dfca9..eb6754aa9 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -28,6 +28,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .help("Data directory for the freezer database.") .takes_value(true) ) + .arg( + Arg::with_name("blobs-dir") + .long("blobs-dir") + .value_name("DIR") + .help("Data directory for the blobs database.") + .takes_value(true) + ) /* * Network parameters. */ diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 7ced91274..e8128cb79 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -390,6 +390,10 @@ pub fn get_config( client_config.freezer_db_path = Some(PathBuf::from(freezer_dir)); } + if let Some(blobs_db_dir) = cli_args.value_of("blobs-dir") { + client_config.blobs_db_path = Some(PathBuf::from(blobs_db_dir)); + } + let (sprp, sprp_explicit) = get_slots_per_restore_point::(cli_args)?; client_config.store.slots_per_restore_point = sprp; client_config.store.slots_per_restore_point_set_explicitly = sprp_explicit; diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 650763dca..b098f57c7 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -64,6 +64,7 @@ impl ProductionBeaconNode { let _datadir = client_config.create_data_dir()?; let db_path = client_config.create_db_path()?; let freezer_db_path = client_config.create_freezer_db_path()?; + let blobs_db_path = client_config.create_blobs_db_path()?; let executor = context.executor.clone(); if let Some(legacy_dir) = client_config.get_existing_legacy_data_dir() { @@ -84,7 +85,13 @@ impl ProductionBeaconNode { .runtime_context(context) .chain_spec(spec) .http_api_config(client_config.http_api.clone()) - .disk_store(&db_path, &freezer_db_path, store_config, log.clone())?; + .disk_store( + &db_path, + &freezer_db_path, + blobs_db_path, + store_config, + log.clone(), + )?; let builder = if let Some(slasher_config) = client_config.slasher.clone() { let slasher = Arc::new( diff --git a/beacon_node/store/src/garbage_collection.rs b/beacon_node/store/src/garbage_collection.rs index 329133632..c70ef8986 100644 --- a/beacon_node/store/src/garbage_collection.rs +++ b/beacon_node/store/src/garbage_collection.rs @@ -31,7 +31,7 @@ where "Garbage collecting {} temporary states", delete_ops.len() / 2 ); - self.do_atomically(delete_ops)?; + self.do_atomically_with_block_and_blobs_cache(delete_ops)?; } Ok(()) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 29025b818..fba0acad0 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -35,7 +35,7 @@ use state_processing::{ use std::cmp::min; use std::convert::TryInto; use std::marker::PhantomData; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; @@ -59,6 +59,8 @@ pub struct HotColdDB, Cold: ItemStore> { pub(crate) config: StoreConfig, /// Cold database containing compact historical data. pub cold_db: Cold, + /// Database containing blobs. If None, store falls back to use `cold_db`. + pub blobs_db: Option, /// Hot database containing duplicated but quick-to-access recent data. /// /// The hot database also contains all blocks. @@ -98,6 +100,8 @@ pub enum HotColdDBError { MissingExecutionPayload(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingAnchorInfo, + MissingPathToBlobsDatabase, + BlobsPreviouslyInDefaultStore, HotStateSummaryError(BeaconStateError), RestorePointDecodeError(ssz::DecodeError), BlockReplayBeaconError(BeaconStateError), @@ -119,6 +123,7 @@ pub enum HotColdDBError { request_slot: Option, state_root: Hash256, }, + Rollback, } impl HotColdDB, MemoryStore> { @@ -134,6 +139,7 @@ impl HotColdDB, MemoryStore> { anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), cold_db: MemoryStore::open(), + blobs_db: Some(MemoryStore::open()), hot_db: MemoryStore::open(), block_cache: Mutex::new(LruCache::new(config.block_cache_size)), blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)), @@ -157,6 +163,7 @@ impl HotColdDB, LevelDB> { pub fn open( hot_path: &Path, cold_path: &Path, + blobs_db_path: Option, migrate_schema: impl FnOnce(Arc, SchemaVersion, SchemaVersion) -> Result<(), Error>, config: StoreConfig, spec: ChainSpec, @@ -169,6 +176,7 @@ impl HotColdDB, LevelDB> { anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), cold_db: LevelDB::open(cold_path)?, + blobs_db: None, hot_db: LevelDB::open(hot_path)?, block_cache: Mutex::new(LruCache::new(config.block_cache_size)), blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)), @@ -213,6 +221,53 @@ impl HotColdDB, LevelDB> { ); } + // Open separate blobs directory if configured and same configuration was used on previous + // run. + let blob_info = db.load_blob_info()?; + let new_blob_info = { + match (&blob_info, &blobs_db_path) { + (Some(blob_info), Some(_)) => { + if !blob_info.blobs_db { + return Err(HotColdDBError::BlobsPreviouslyInDefaultStore.into()); + } + BlobInfo { + oldest_blob_slot: blob_info.oldest_blob_slot, + blobs_db: true, + } + } + (Some(blob_info), None) => { + if blob_info.blobs_db { + return Err(HotColdDBError::MissingPathToBlobsDatabase.into()); + } + BlobInfo { + oldest_blob_slot: blob_info.oldest_blob_slot, + blobs_db: false, + } + } + (None, Some(_)) => BlobInfo { + oldest_blob_slot: None, + blobs_db: true, + }, // first time starting up node + (None, None) => BlobInfo { + oldest_blob_slot: None, + blobs_db: false, + }, // first time starting up node + } + }; + if new_blob_info.blobs_db { + if let Some(path) = &blobs_db_path { + db.blobs_db = Some(LevelDB::open(path.as_path())?); + } + } + let blob_info = blob_info.unwrap_or(db.get_blob_info()); + db.compare_and_set_blob_info_with_write(blob_info, new_blob_info)?; + info!( + db.log, + "Blobs DB initialized"; + "use separate blobs db" => db.get_blob_info().blobs_db, + "path" => ?blobs_db_path + ); + // Ensure that the schema version of the on-disk database matches the software. // If the version is mismatched, an automatic migration will be attempted. let db = Arc::new(db); @@ -508,11 +563,14 @@ impl, Cold: ItemStore> HotColdDB self.hot_db .key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())?; self.hot_db - .key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes()) + .key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes())?; + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes()) } pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar) -> Result<(), Error> { - self.hot_db.put_bytes( + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + blobs_db.put_bytes( DBColumn::BeaconBlob.into(), block_root.as_bytes(), &blobs.as_ssz_bytes(), @@ -521,21 +579,6 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { - // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, - // may want to attempt to use one again - if let Some(bytes) = self - .hot_db - .get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? - { - let ret = BlobsSidecar::from_ssz_bytes(&bytes)?; - self.blob_cache.lock().put(*block_root, ret.clone()); - Ok(Some(ret)) - } else { - Ok(None) - } - } - pub fn blobs_as_kv_store_ops( &self, key: &Hash256, @@ -832,21 +875,75 @@ impl, Cold: ItemStore> HotColdDB Ok(key_value_batch) } - pub fn do_atomically(&self, batch: Vec>) -> Result<(), Error> { - // Update the block cache whilst holding a lock, to ensure that the cache updates atomically - // with the database. + pub fn do_atomically_with_block_and_blobs_cache( + &self, + batch: Vec>, + ) -> Result<(), Error> { + let mut blobs_to_delete = Vec::new(); + let (blobs_ops, hot_db_ops): (Vec>, Vec>) = + batch.into_iter().partition(|store_op| match store_op { + StoreOp::PutBlobs(_, _) => true, + StoreOp::DeleteBlobs(block_root) => { + match self.get_blobs(block_root) { + Ok(Some(blobs_sidecar)) => { + blobs_to_delete.push(blobs_sidecar); + } + Err(e) => { + error!( + self.log, "Error getting blobs"; + "block_root" => %block_root, + "error" => ?e + ); + } + _ => (), + } + true + } + StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => false, + _ => false, + }); + + // Update database whilst holding a lock on cache, to ensure that the cache updates + // atomically with the database. let mut guard = self.block_cache.lock(); let mut guard_blob = self.blob_cache.lock(); - for op in &batch { + let blob_cache_ops = blobs_ops.clone(); + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + // Try to execute blobs store ops. + blobs_db.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?; + + let hot_db_cache_ops = hot_db_ops.clone(); + // Try to execute hot db store ops. + let tx_res = match self.convert_to_kv_batch(hot_db_ops) { + Ok(kv_store_ops) => self.hot_db.do_atomically(kv_store_ops), + Err(e) => Err(e), + }; + // Rollback on failure + if let Err(e) = tx_res { + let mut blob_cache_ops = blob_cache_ops; + for op in blob_cache_ops.iter_mut() { + let reverse_op = match op { + StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root), + StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() { + Some(blobs) => StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs)), + None => return Err(HotColdDBError::Rollback.into()), + }, + _ => return Err(HotColdDBError::Rollback.into()), + }; + *op = reverse_op; + } + blobs_db.do_atomically(self.convert_to_kv_batch(blob_cache_ops)?)?; + return Err(e); + } + + for op in hot_db_cache_ops { match op { StoreOp::PutBlock(block_root, block) => { - guard.put(*block_root, (**block).clone()); + guard.put(block_root, (*block).clone()); } - StoreOp::PutBlobs(block_root, blobs) => { - guard_blob.put(*block_root, (**blobs).clone()); - } + StoreOp::PutBlobs(_, _) => (), StoreOp::PutState(_, _) => (), @@ -857,12 +954,10 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteStateTemporaryFlag(_) => (), StoreOp::DeleteBlock(block_root) => { - guard.pop(block_root); + guard.pop(&block_root); } - StoreOp::DeleteBlobs(block_root) => { - guard_blob.pop(block_root); - } + StoreOp::DeleteBlobs(_) => (), StoreOp::DeleteState(_, _) => (), @@ -874,8 +969,20 @@ impl, Cold: ItemStore> HotColdDB } } - self.hot_db - .do_atomically(self.convert_to_kv_batch(batch)?)?; + for op in blob_cache_ops { + match op { + StoreOp::PutBlobs(block_root, blobs) => { + guard_blob.put(block_root, (*blobs).clone()); + } + + StoreOp::DeleteBlobs(block_root) => { + guard_blob.pop(&block_root); + } + + _ => (), + } + } + drop(guard); drop(guard_blob); @@ -1212,6 +1319,22 @@ impl, Cold: ItemStore> HotColdDB }) } + /// Fetch a blobs sidecar from the store. + pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + + match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? { + Some(ref blobs_bytes) => { + let blobs = BlobsSidecar::from_ssz_bytes(blobs_bytes)?; + // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, + // may want to attempt to use one again + self.blob_cache.lock().put(*block_root, blobs.clone()); + Ok(Some(blobs)) + } + None => Ok(None), + } + } + /// Get a reference to the `ChainSpec` used by the database. pub fn get_chain_spec(&self) -> &ChainSpec { &self.spec @@ -1713,7 +1836,7 @@ impl, Cold: ItemStore> HotColdDB } } let payloads_pruned = ops.len(); - self.do_atomically(ops)?; + self.do_atomically_with_block_and_blobs_cache(ops)?; info!( self.log, "Execution payload pruning complete"; @@ -1862,16 +1985,14 @@ impl, Cold: ItemStore> HotColdDB } } let blobs_sidecars_pruned = ops.len(); - - let update_blob_info = self.compare_and_set_blob_info( - blob_info, - BlobInfo { - oldest_blob_slot: Some(end_slot + 1), - }, - )?; + let new_blob_info = BlobInfo { + oldest_blob_slot: Some(end_slot + 1), + blobs_db: blob_info.blobs_db, + }; + let update_blob_info = self.compare_and_set_blob_info(blob_info, new_blob_info)?; ops.push(StoreOp::KeyValueOp(update_blob_info)); - self.do_atomically(ops)?; + self.do_atomically_with_block_and_blobs_cache(ops)?; info!( self.log, "Blobs sidecar pruning complete"; @@ -2011,7 +2132,7 @@ pub fn migrate_database, Cold: ItemStore>( } // Delete the states from the hot database if we got this far. - store.do_atomically(hot_db_ops)?; + store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?; debug!( store.log, diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 1d7e92b80..3056c2929 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -101,6 +101,7 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { } #[must_use] +#[derive(Clone)] pub enum KeyValueStoreOp { PutKeyValue(Vec, Vec), DeleteKey(Vec), @@ -154,6 +155,7 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati /// Reified key-value storage operation. Helps in modifying the storage atomically. /// See also https://github.com/sigp/lighthouse/issues/692 +#[derive(Clone)] pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index b5de0048f..92117254f 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -124,6 +124,8 @@ impl StoreItem for AnchorInfo { pub struct BlobInfo { /// The slot after which blobs are available (>=). pub oldest_blob_slot: Option, + /// A separate blobs database is in use. + pub blobs_db: bool, } impl StoreItem for BlobInfo { diff --git a/consensus/types/src/transaction.rs b/consensus/types/src/transaction.rs index 797ee1dae..ee0af981b 100644 --- a/consensus/types/src/transaction.rs +++ b/consensus/types/src/transaction.rs @@ -9,6 +9,12 @@ pub type MaxAccessListSize = U16777216; pub type MaxVersionedHashesListSize = U16777216; pub type MaxAccessListStorageKeys = U16777216; +#[derive(Debug, Clone, PartialEq, Encode, Decode)] +pub struct SignedBlobTransaction { + pub message: BlobTransaction, + pub signature: EcdsaSignature, +} + #[derive(Debug, Clone, PartialEq, Encode, Decode)] pub struct BlobTransaction { pub chain_id: Uint256, @@ -29,3 +35,10 @@ pub struct AccessTuple { pub address: Address, pub storage_keys: VariableList, } + +#[derive(Debug, Clone, PartialEq, Encode, Decode)] +pub struct EcdsaSignature { + pub y_parity: bool, + pub r: Uint256, + pub s: Uint256, +} diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index a33e6c149..7d5753434 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -104,6 +104,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .default_value("0"), ) + .arg( + Arg::with_name("blobs-dir") + .long("blobs-dir") + .value_name("DIR") + .help("Data directory for the blobs database.") + .takes_value(true), + ) .subcommand(migrate_cli_app()) .subcommand(version_cli_app()) .subcommand(inspect_cli_app()) @@ -123,6 +130,10 @@ fn parse_client_config( client_config.freezer_db_path = Some(freezer_dir); } + if let Some(blobs_db_dir) = clap_utils::parse_optional(cli_args, "blobs-dir")? { + client_config.blobs_db_path = Some(blobs_db_dir); + } + let (sprp, sprp_explicit) = get_slots_per_restore_point::(cli_args)?; client_config.store.slots_per_restore_point = sprp; client_config.store.slots_per_restore_point_set_explicitly = sprp_explicit; @@ -144,11 +155,13 @@ pub fn display_db_version( let spec = runtime_context.eth2_config.spec.clone(); let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let mut version = CURRENT_SCHEMA_VERSION; HotColdDB::, LevelDB>::open( &hot_path, &cold_path, + blobs_path, |_, from, _| { version = from; Ok(()) @@ -200,10 +213,12 @@ pub fn inspect_db( let spec = runtime_context.eth2_config.spec.clone(); let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, + blobs_path, |_, _, _| Ok(()), client_config.store, spec, @@ -254,12 +269,14 @@ pub fn migrate_db( let spec = &runtime_context.eth2_config.spec; let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let mut from = CURRENT_SCHEMA_VERSION; let to = migrate_config.to; let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, + blobs_path, |_, db_initial_version, _| { from = db_initial_version; Ok(()) @@ -294,10 +311,12 @@ pub fn prune_payloads( let spec = &runtime_context.eth2_config.spec; let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, + blobs_path, |_, _, _| Ok(()), client_config.store, spec.clone(), @@ -318,10 +337,12 @@ pub fn prune_blobs( let spec = &runtime_context.eth2_config.spec; let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let db = HotColdDB::, LevelDB>::open( &hot_path, &cold_path, + blobs_path, |_, _, _| Ok(()), client_config.store, spec.clone(),