From b5b4ce950981a08543a2e4750a4310fdf4085728 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 5 Oct 2022 17:14:45 -0400 Subject: [PATCH] blob production --- beacon_node/beacon_chain/src/beacon_chain.rs | 33 +- beacon_node/beacon_chain/src/errors.rs | 5 + .../beacon_chain/src/execution_payload.rs | 43 +- .../src/engine_api/json_structures.rs | 5 +- beacon_node/execution_layer/src/lib.rs | 4 +- beacon_node/http_api/src/lib.rs | 24 +- beacon_node/http_api/src/metrics.rs | 12 + beacon_node/http_api/src/publish_blobs.rs | 129 ++++++ common/eth2/src/lib.rs | 47 +++ common/eth2/src/types.rs | 8 + consensus/types/src/blobs_sidecar.rs | 13 +- consensus/types/src/signed_blobs_sidecar.rs | 14 +- validator_client/src/block_service.rs | 394 ++++++++++++------ validator_client/src/http_metrics/metrics.rs | 6 + validator_client/src/signing_method.rs | 3 + .../src/signing_method/web3signer.rs | 4 + validator_client/src/validator_store.rs | 47 ++- 17 files changed, 623 insertions(+), 168 deletions(-) create mode 100644 beacon_node/http_api/src/publish_blobs.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f8b9dde98..943e3a20f 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -255,7 +255,7 @@ struct PartialBeaconBlock { deposits: Vec, voluntary_exits: Vec, sync_aggregate: Option>, - prepare_payload_handle: Option>, + prepare_payload_handle: Option>, } pub type BeaconForkChoice = ForkChoice< @@ -3291,15 +3291,16 @@ impl BeaconChain { // // Wait for the execution layer to return an execution payload (if one is required). let prepare_payload_handle = partial_beacon_block.prepare_payload_handle.take(); - let execution_payload = if let Some(prepare_payload_handle) = prepare_payload_handle { - let execution_payload = prepare_payload_handle - .await - .map_err(BlockProductionError::TokioJoin)? - .ok_or(BlockProductionError::ShuttingDown)??; - Some(execution_payload) - } else { - None - }; + let (execution_payload, kzg_commitments, blobs) = + if let Some(prepare_payload_handle) = prepare_payload_handle { + let (execution_payload, commitments, blobs) = prepare_payload_handle + .await + .map_err(BlockProductionError::TokioJoin)? + .ok_or(BlockProductionError::ShuttingDown)??; + (execution_payload, commitments, blobs) + } else { + return Err(BlockProductionError::MissingExecutionPayload); + }; // Part 3/3 (blocking) // @@ -3311,6 +3312,7 @@ impl BeaconChain { chain.complete_partial_beacon_block( partial_beacon_block, execution_payload, + kzg_commitments, verification, ) }, @@ -3557,7 +3559,8 @@ impl BeaconChain { fn complete_partial_beacon_block>( &self, partial_beacon_block: PartialBeaconBlock, - execution_payload: Option, + execution_payload: Payload, + kzg_commitments: Vec, verification: ProduceBlockVerification, ) -> Result, BlockProductionError> { let PartialBeaconBlock { @@ -3633,8 +3636,7 @@ impl BeaconChain { voluntary_exits: voluntary_exits.into(), sync_aggregate: sync_aggregate .ok_or(BlockProductionError::MissingSyncAggregate)?, - execution_payload: execution_payload - .ok_or(BlockProductionError::MissingExecutionPayload)?, + execution_payload, }, }), BeaconState::Eip4844(_) => BeaconBlock::Eip4844(BeaconBlockEip4844 { @@ -3653,10 +3655,9 @@ impl BeaconChain { voluntary_exits: voluntary_exits.into(), sync_aggregate: sync_aggregate .ok_or(BlockProductionError::MissingSyncAggregate)?, - execution_payload: execution_payload - .ok_or(BlockProductionError::MissingExecutionPayload)?, + execution_payload, //FIXME(sean) get blobs - blob_kzg_commitments: VariableList::empty(), + blob_kzg_commitments: VariableList::from(kzg_commitments), }, }), }; diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 704cba489..db521d4a3 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -249,6 +249,11 @@ pub enum BlockProductionError { BlockingFailed(execution_layer::Error), TerminalPoWBlockLookupFailed(execution_layer::Error), GetPayloadFailed(execution_layer::Error), + GetBlobsFailed(execution_layer::Error), + BlobPayloadMismatch { + blob_block_hash: ExecutionBlockHash, + payload_block_hash: ExecutionBlockHash, + }, FailedToReadFinalizedBlock(store::Error), MissingFinalizedBlock(Hash256), BlockTooLarge(usize), diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index f056aeb99..642fae528 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -12,6 +12,7 @@ use crate::{ BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, BlockProductionError, ExecutionPayloadError, }; +use execution_layer::json_structures::JsonBlobBundlesV1; use execution_layer::{BuilderParams, PayloadStatus}; use fork_choice::{InvalidationOperation, PayloadVerificationStatus}; use proto_array::{Block as ProtoBlock, ExecutionStatus}; @@ -25,12 +26,13 @@ use std::sync::Arc; use tokio::task::JoinHandle; use tree_hash::TreeHash; use types::{ - BeaconBlockRef, BeaconState, BeaconStateError, EthSpec, ExecPayload, ExecutionBlockHash, - Hash256, SignedBeaconBlock, Slot, + BeaconBlockRef, BeaconState, BeaconStateError, Blob, BlobsSidecar, EthSpec, ExecPayload, + ExecutionBlockHash, Hash256, KzgCommitment, SignedBeaconBlock, Slot, }; -pub type PreparePayloadResult = Result; -pub type PreparePayloadHandle = JoinHandle>>; +pub type PreparePayloadResult = + Result<(Payload, Vec, Vec>), BlockProductionError>; +pub type PreparePayloadHandle = JoinHandle>>; #[derive(PartialEq)] pub enum AllowOptimisticImport { @@ -354,7 +356,7 @@ pub fn get_execution_payload< state: &BeaconState, proposer_index: u64, builder_params: BuilderParams, -) -> Result, BlockProductionError> { +) -> Result, BlockProductionError> { // Compute all required values from the `state` now to avoid needing to pass it into a spawned // task. let spec = &chain.spec; @@ -413,7 +415,7 @@ pub async fn prepare_execution_payload( proposer_index: u64, latest_execution_payload_header_block_hash: ExecutionBlockHash, builder_params: BuilderParams, -) -> Result +) -> PreparePayloadResult where T: BeaconChainTypes, Payload: ExecPayload + Default, @@ -473,8 +475,8 @@ where // Note: the suggested_fee_recipient is stored in the `execution_layer`, it will add this parameter. // // This future is not executed here, it's up to the caller to await it. - let execution_payload = execution_layer - .get_payload::( + let (execution_payload_result, blobs_result) = tokio::join!( + execution_layer.get_payload::( parent_hash, timestamp, random, @@ -482,17 +484,20 @@ where forkchoice_update_params, builder_params, &chain.spec, - ) - .await - .map_err(BlockProductionError::GetPayloadFailed)?; + ), + execution_layer.get_blob_bundles(parent_hash, timestamp, random, proposer_index) + ); - /* - TODO: fetch blob bundles from el engine for block building - let suggested_fee_recipient = execution_layer.get_suggested_fee_recipient(proposer_index).await; - let blobs = execution_layer.get_blob_bundles(parent_hash, timestamp, random, suggested_fee_recipient) - .await - .map_err(BlockProductionError::GetPayloadFailed)?; - */ + let execution_payload = + execution_payload_result.map_err(BlockProductionError::GetPayloadFailed)?; + let blobs = blobs_result.map_err(BlockProductionError::GetPayloadFailed)?; - Ok(execution_payload) + if execution_payload.block_hash() != blobs.block_hash { + return Err(BlockProductionError::BlobPayloadMismatch { + blob_block_hash: blobs.block_hash, + payload_block_hash: execution_payload.block_hash(), + }); + } + + Ok((execution_payload, blobs.kzgs, blobs.blobs)) } diff --git a/beacon_node/execution_layer/src/engine_api/json_structures.rs b/beacon_node/execution_layer/src/engine_api/json_structures.rs index eeea53724..fde4f706a 100644 --- a/beacon_node/execution_layer/src/engine_api/json_structures.rs +++ b/beacon_node/execution_layer/src/engine_api/json_structures.rs @@ -1,6 +1,6 @@ use super::*; use serde::{Deserialize, Serialize}; -use types::{EthSpec, ExecutionBlockHash, FixedVector, Transaction, Unsigned, VariableList}; +use types::{Blob, EthSpec, ExecutionBlockHash, FixedVector, Transaction, Unsigned, VariableList}; #[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -272,10 +272,9 @@ impl From for PayloadAttributes { #[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(bound = "T: EthSpec", rename_all = "camelCase")] pub struct JsonBlobBundlesV1 { - pub block_hash: Hash256, + pub block_hash: ExecutionBlockHash, pub kzgs: Vec, pub blobs: Vec>, - pub aggregated_proof: KzgProof, } #[derive(Debug, PartialEq, Serialize, Deserialize)] diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 99f86b86e..1078876ef 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -787,8 +787,10 @@ impl ExecutionLayer { parent_hash: ExecutionBlockHash, timestamp: u64, prev_randao: Hash256, - suggested_fee_recipient: Address, + proposer_index: u64, ) -> Result, Error> { + let suggested_fee_recipient = self.get_suggested_fee_recipient(proposer_index).await; + debug!( self.log(), "Issuing engine_getBlobsBundle"; diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 5b4fa5816..203b462b1 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -13,6 +13,7 @@ mod block_rewards; mod database; mod metrics; mod proposer_duties; +mod publish_blobs; mod publish_blocks; mod state_id; mod sync_committees; @@ -48,7 +49,7 @@ use types::{ Attestation, AttestationData, AttesterSlashing, BeaconStateError, BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, - SignedBeaconBlock, SignedBlindedBeaconBlock, SignedContributionAndProof, + SignedBeaconBlock, SignedBlindedBeaconBlock, SignedBlobsSidecar, SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, }; @@ -1052,6 +1053,26 @@ pub fn serve( }, ); + // POST beacon/blobs + let post_beacon_blobs = eth_v1 + .and(warp::path("beacon")) + .and(warp::path("blobs")) + .and(warp::path::end()) + .and(warp::body::json()) + .and(chain_filter.clone()) + .and(network_tx_filter.clone()) + .and(log_filter.clone()) + .and_then( + |blobs: Arc>, + chain: Arc>, + network_tx: UnboundedSender>, + log: Logger| async move { + publish_blobs::publish_blobs(blobs, chain, &network_tx, log) + .await + .map(|()| warp::reply()) + }, + ); + /* * beacon/blocks */ @@ -3162,6 +3183,7 @@ pub fn serve( post_beacon_blocks .boxed() .or(post_beacon_blinded_blocks.boxed()) + .or(post_beacon_blobs.boxed()) .or(post_beacon_pool_attestations.boxed()) .or(post_beacon_pool_attester_slashings.boxed()) .or(post_beacon_pool_proposer_slashings.boxed()) diff --git a/beacon_node/http_api/src/metrics.rs b/beacon_node/http_api/src/metrics.rs index 1c3ab1f68..685191373 100644 --- a/beacon_node/http_api/src/metrics.rs +++ b/beacon_node/http_api/src/metrics.rs @@ -41,4 +41,16 @@ lazy_static::lazy_static! { "http_api_block_published_very_late_total", "The count of times a block was published beyond the attestation deadline" ); + pub static ref HTTP_API_BLOB_BROADCAST_DELAY_TIMES: Result = try_create_histogram( + "http_api_blob_broadcast_delay_times", + "Time between start of the slot and when the blob was broadcast" + ); + pub static ref HTTP_API_BLOB_PUBLISHED_LATE_TOTAL: Result = try_create_int_counter( + "http_api_blob_published_late_total", + "The count of times a blob was published beyond more than half way to the attestation deadline" + ); + pub static ref HTTP_API_BLOB_PUBLISHED_VERY_LATE_TOTAL: Result = try_create_int_counter( + "http_api_blob_published_very_late_total", + "The count of times a blob was published beyond the attestation deadline" + ); } diff --git a/beacon_node/http_api/src/publish_blobs.rs b/beacon_node/http_api/src/publish_blobs.rs new file mode 100644 index 000000000..41d76c61c --- /dev/null +++ b/beacon_node/http_api/src/publish_blobs.rs @@ -0,0 +1,129 @@ +use crate::metrics; +use beacon_chain::validator_monitor::{get_block_delay_ms, get_slot_delay_ms, timestamp_now}; +use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized}; +use lighthouse_network::PubsubMessage; +use network::NetworkMessage; +use slog::{crit, error, info, warn, Logger}; +use slot_clock::SlotClock; +use std::sync::Arc; +use tokio::sync::mpsc::UnboundedSender; +use tree_hash::TreeHash; +use types::{ + BlindedPayload, ExecPayload, ExecutionBlockHash, ExecutionPayload, FullPayload, Hash256, + SignedBeaconBlock, SignedBlobsSidecar, +}; +use warp::Rejection; + +/// Handles a request from the HTTP API for full blocks. +pub async fn publish_blobs( + blobs_sidecar: Arc>, + chain: Arc>, + network_tx: &UnboundedSender>, + log: Logger, +) -> Result<(), Rejection> { + let seen_timestamp = timestamp_now(); + + // Send the blob, regardless of whether or not it is valid. The API + // specification is very clear that this is the desired behaviour. + crate::publish_pubsub_message( + network_tx, + PubsubMessage::BlobsSidecars(blobs_sidecar.clone()), + )?; + + // Determine the delay after the start of the slot, register it with metrics. + let delay = get_slot_delay_ms( + seen_timestamp, + blobs_sidecar.message.beacon_block_slot, + &chain.slot_clock, + ); + metrics::observe_duration(&metrics::HTTP_API_BLOB_BROADCAST_DELAY_TIMES, delay); + + //FIXME(sean) process blobs + // match chain + // .process_block(blobs_sidecar.clone(), CountUnrealized::True) + // .await + // { + // Ok(root) => { + // info!( + // log, + // "Valid block from HTTP API"; + // "block_delay" => ?delay, + // "root" => format!("{}", root), + // "proposer_index" => block.message().proposer_index(), + // "slot" => block.slot(), + // ); + // + // // Notify the validator monitor. + // chain.validator_monitor.read().register_api_block( + // seen_timestamp, + // blobs_sidecar.message(), + // root, + // &chain.slot_clock, + // ); + // + // // Update the head since it's likely this block will become the new + // // head. + // chain.recompute_head_at_current_slot().await; + // + // // Perform some logging to inform users if their blocks are being produced + // // late. + // // + // // Check to see the thresholds are non-zero to avoid logging errors with small + // // slot times (e.g., during testing) + // let crit_threshold = chain.slot_clock.unagg_attestation_production_delay(); + // let error_threshold = crit_threshold / 2; + // if delay >= crit_threshold { + // crit!( + // log, + // "Block was broadcast too late"; + // "msg" => "system may be overloaded, block likely to be orphaned", + // "delay_ms" => delay.as_millis(), + // "slot" => block.slot(), + // "root" => ?root, + // ) + // } else if delay >= error_threshold { + // error!( + // log, + // "Block broadcast was delayed"; + // "msg" => "system may be overloaded, block may be orphaned", + // "delay_ms" => delay.as_millis(), + // "slot" => block.slot(), + // "root" => ?root, + // ) + // } + // + // Ok(()) + // } + // Err(BlockError::BlockIsAlreadyKnown) => { + // info!( + // log, + // "Block from HTTP API already known"; + // "block" => ?block.canonical_root(), + // "slot" => block.slot(), + // ); + // Ok(()) + // } + // Err(BlockError::RepeatProposal { proposer, slot }) => { + // warn!( + // log, + // "Block ignored due to repeat proposal"; + // "msg" => "this can happen when a VC uses fallback BNs. \ + // whilst this is not necessarily an error, it can indicate issues with a BN \ + // or between the VC and BN.", + // "slot" => slot, + // "proposer" => proposer, + // ); + // Ok(()) + // } + // Err(e) => { + // let msg = format!("{:?}", e); + // error!( + // log, + // "Invalid block provided to HTTP API"; + // "reason" => &msg + // ); + // Err(warp_utils::reject::broadcast_without_import(msg)) + // } + // } + Ok(()) +} diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 104ca9ccd..52e8922cc 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -603,6 +603,27 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST beacon/blobs` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn post_beacon_blobs( + &self, + block: &SignedBlobsSidecar, + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("blobs"); + + //FIXME(sean) should we re-use the proposal timeout? seems reasonable to.. + self.post_with_timeout(path, block, self.timeouts.proposal) + .await?; + + Ok(()) + } + /// `POST beacon/blinded_blocks` /// /// Returns `Ok(None)` on a 404 error. @@ -1269,6 +1290,32 @@ impl BeaconNodeHttpClient { self.get(path).await } + /// `GET v1/validator/blocks_and_blobs/{slot}` + pub async fn get_validator_blocks_and_blobs>( + &self, + slot: Slot, + randao_reveal: &SignatureBytes, + graffiti: Option<&Graffiti>, + ) -> Result>, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("blocks_and_blobs") + .push(&slot.to_string()); + + path.query_pairs_mut() + .append_pair("randao_reveal", &randao_reveal.to_string()); + + if let Some(graffiti) = graffiti { + path.query_pairs_mut() + .append_pair("graffiti", &graffiti.to_string()); + } + + self.get(path).await + } + /// `GET v2/validator/blinded_blocks/{slot}` pub async fn get_validator_blinded_blocks>( &self, diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index e65735800..2ac4fcf49 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1110,6 +1110,14 @@ pub struct LivenessResponseData { pub is_live: bool, } +#[derive(PartialEq, Debug, Serialize, Deserialize)] +#[serde(bound = "T: EthSpec, Payload: ExecPayload")] +pub struct BlocksAndBlobs> { + pub block: BeaconBlock, + pub blobs: Vec>, + pub kzg_aggregate_proof: KzgProof, +} + #[cfg(test)] mod tests { use super::*; diff --git a/consensus/types/src/blobs_sidecar.rs b/consensus/types/src/blobs_sidecar.rs index 75100a041..5003a97a6 100644 --- a/consensus/types/src/blobs_sidecar.rs +++ b/consensus/types/src/blobs_sidecar.rs @@ -1,5 +1,5 @@ use crate::kzg_proof::KzgProof; -use crate::{Blob, EthSpec, Hash256, Slot}; +use crate::{BeaconBlock, Blob, EthSpec, Hash256, SignedRoot, Slot}; use serde_derive::{Deserialize, Serialize}; use ssz::Encode; use ssz_derive::{Decode, Encode}; @@ -9,14 +9,17 @@ use tree_hash_derive::TreeHash; #[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq, Default)] -pub struct BlobsSidecar { +#[serde(bound = "T: EthSpec")] +pub struct BlobsSidecar { pub beacon_block_root: Hash256, pub beacon_block_slot: Slot, - pub blobs: VariableList, E::MaxBlobsPerBlock>, + pub blobs: VariableList, T::MaxBlobsPerBlock>, pub kzg_aggregate_proof: KzgProof, } -impl BlobsSidecar { +impl SignedRoot for BlobsSidecar {} + +impl BlobsSidecar { pub fn empty() -> Self { Self::default() } @@ -24,6 +27,6 @@ impl BlobsSidecar { // Fixed part Self::empty().as_ssz_bytes().len() // Max size of variable length `blobs` field - + (E::max_blobs_per_block() * as Encode>::ssz_fixed_len()) + + (T::max_blobs_per_block() * as Encode>::ssz_fixed_len()) } } diff --git a/consensus/types/src/signed_blobs_sidecar.rs b/consensus/types/src/signed_blobs_sidecar.rs index 3e1ee6df8..74a779219 100644 --- a/consensus/types/src/signed_blobs_sidecar.rs +++ b/consensus/types/src/signed_blobs_sidecar.rs @@ -8,7 +8,17 @@ use tree_hash_derive::TreeHash; #[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, PartialEq)] -pub struct SignedBlobsSidecar { - pub message: BlobsSidecar, +#[serde(bound = "T: EthSpec")] +pub struct SignedBlobsSidecar { + pub message: BlobsSidecar, pub signature: Signature, } + +impl SignedBlobsSidecar { + pub fn from_blob(blob: BlobsSidecar, signature: Signature) -> Self { + Self { + message: blob, + signature, + } + } +} diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index b0b69a4f5..3ef0c0e25 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -6,13 +6,16 @@ use crate::{ }; use crate::{http_metrics::metrics, validator_store::ValidatorStore}; use environment::RuntimeContext; -use eth2::types::Graffiti; +use eth2::types::{Graffiti, VariableList}; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; use tokio::sync::mpsc; -use types::{BlindedPayload, BlockType, EthSpec, ExecPayload, FullPayload, PublicKeyBytes, Slot}; +use types::{ + BlindedPayload, BlobsSidecar, BlockType, EthSpec, ExecPayload, ForkName, FullPayload, + PublicKeyBytes, Slot, +}; #[derive(Debug)] pub enum BlockError { @@ -316,126 +319,285 @@ impl BlockService { let proposer_index = self.validator_store.validator_index(&validator_pubkey); let validator_pubkey_ref = &validator_pubkey; - // Request block from first responsive beacon node. - let block = self - .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let block = match Payload::block_type() { - BlockType::Full => { - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_GET], - ); - beacon_node - .get_validator_blocks::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })? - .data - } - BlockType::Blinded => { - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BLINDED_BEACON_BLOCK_HTTP_GET], - ); - beacon_node - .get_validator_blinded_blocks::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })? - .data - } - }; + match self.context.eth2_config.spec.fork_name_at_slot::(slot) { + ForkName::Base | ForkName::Altair | ForkName::Merge => { + // Request block from first responsive beacon node. + let block = self + .beacon_nodes + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { + let block = match Payload::block_type() { + BlockType::Full => { + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_GET], + ); + beacon_node + .get_validator_blocks::( + slot, + randao_reveal_ref, + graffiti.as_ref(), + ) + .await + .map_err(|e| { + BlockError::Recoverable(format!( + "Error from beacon node when producing block: {:?}", + e + )) + })? + .data + } + BlockType::Blinded => { + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BLINDED_BEACON_BLOCK_HTTP_GET], + ); + beacon_node + .get_validator_blinded_blocks::( + slot, + randao_reveal_ref, + graffiti.as_ref(), + ) + .await + .map_err(|e| { + BlockError::Recoverable(format!( + "Error from beacon node when producing block: {:?}", + e + )) + })? + .data + } + }; - if proposer_index != Some(block.proposer_index()) { - return Err(BlockError::Recoverable( - "Proposer index does not match block proposer. Beacon chain re-orged" - .to_string(), - )); - } + if proposer_index != Some(block.proposer_index()) { + return Err(BlockError::Recoverable( + "Proposer index does not match block proposer. Beacon chain re-orged" + .to_string(), + )); + } - Ok::<_, BlockError>(block) - }, - ) - .await?; + Ok::<_, BlockError>(block) + }, + ) + .await?; - let signed_block = self_ref - .validator_store - .sign_block::(*validator_pubkey_ref, block, current_slot) - .await - .map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?; + let signed_block = self_ref + .validator_store + .sign_block::(*validator_pubkey_ref, block, current_slot) + .await + .map_err(|e| { + BlockError::Recoverable(format!("Unable to sign block: {:?}", e)) + })?; + + // Publish block with first available beacon node. + self.beacon_nodes + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async { + match Payload::block_type() { + BlockType::Full => { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_POST], + ); + beacon_node + .post_beacon_blocks(&signed_block) + .await + .map_err(|e| { + BlockError::Irrecoverable(format!( + "Error from beacon node when publishing block: {:?}", + e + )) + })? + } + BlockType::Blinded => { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BLINDED_BEACON_BLOCK_HTTP_POST], + ); + beacon_node + .post_beacon_blinded_blocks(&signed_block) + .await + .map_err(|e| { + BlockError::Irrecoverable(format!( + "Error from beacon node when publishing block: {:?}", + e + )) + })? + } + } + Ok::<_, BlockError>(()) + }, + ) + .await?; + + info!( + log, + "Successfully published block"; + "block_type" => ?Payload::block_type(), + "deposits" => signed_block.message().body().deposits().len(), + "attestations" => signed_block.message().body().attestations().len(), + "graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()), + "slot" => signed_block.slot().as_u64(), + ); + } + ForkName::Eip4844 => { + if matches!(Payload::block_type(), BlockType::Blinded) { + //FIXME(sean) + crit!( + log, + "`--builder-payloads` not yet supported for EIP-4844 fork" + ); + return Ok(()); + } + + // Request block from first responsive beacon node. + let block_and_blobs = self + .beacon_nodes + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { + + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_GET], + ); + let block_and_blobs = beacon_node + .get_validator_blocks_and_blobs::( + slot, + randao_reveal_ref, + graffiti.as_ref(), + ) + .await + .map_err(|e| { + BlockError::Recoverable(format!( + "Error from beacon node when producing block: {:?}", + e + )) + })? + .data; + + if proposer_index != Some(block_and_blobs.block.proposer_index()) { + return Err(BlockError::Recoverable( + "Proposer index does not match block proposer. Beacon chain re-orged" + .to_string(), + )); + } + + Ok::<_, BlockError>(block_and_blobs) + }, + ) + .await?; + + let blobs_sidecar = BlobsSidecar { + beacon_block_root: block_and_blobs.block.canonical_root(), + beacon_block_slot: block_and_blobs.block.slot(), + blobs: VariableList::from(block_and_blobs.blobs), + kzg_aggregate_proof: block_and_blobs.kzg_aggregate_proof, + }; + + let block = block_and_blobs.block; + let block_publish_future = async { + let signed_block = self_ref + .validator_store + .sign_block::(*validator_pubkey_ref, block, current_slot) + .await + .map_err(|e| { + BlockError::Recoverable(format!("Unable to sign block: {:?}", e)) + })?; + + // Publish block with first available beacon node. + self.beacon_nodes + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_POST], + ); + beacon_node + .post_beacon_blocks(&signed_block) + .await + .map_err(|e| { + BlockError::Irrecoverable(format!( + "Error from beacon node when publishing block: {:?}", + e + )) + })?; + Ok::<_, BlockError>(()) + }, + ) + .await?; + + info!( + log, + "Successfully published block"; + "block_type" => ?Payload::block_type(), + "deposits" => signed_block.message().body().deposits().len(), + "attestations" => signed_block.message().body().attestations().len(), + "graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()), + "slot" => signed_block.slot().as_u64(), + ); - // Publish block with first available beacon node. - self.beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async { - match Payload::block_type() { - BlockType::Full => { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_POST], - ); - beacon_node - .post_beacon_blocks(&signed_block) - .await - .map_err(|e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing block: {:?}", - e - )) - })? - } - BlockType::Blinded => { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BLINDED_BEACON_BLOCK_HTTP_POST], - ); - beacon_node - .post_beacon_blinded_blocks(&signed_block) - .await - .map_err(|e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing block: {:?}", - e - )) - })? - } - } Ok::<_, BlockError>(()) - }, - ) - .await?; + }; + + let blob_publish_future = async { + let signed_blobs = self_ref + .validator_store + .sign_blobs(*validator_pubkey_ref, blobs_sidecar, current_slot) + .await + .map_err(|e| { + BlockError::Recoverable(format!("Unable to sign blob: {:?}", e)) + })?; + + // Publish block with first available beacon node. + self.beacon_nodes + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOB_HTTP_POST], + ); + beacon_node.post_beacon_blobs(&signed_blobs).await.map_err( + |e| { + BlockError::Irrecoverable(format!( + "Error from beacon node when publishing blob: {:?}", + e + )) + }, + )?; + Ok::<_, BlockError>(()) + }, + ) + .await?; + + info!( + log, + "Successfully published blobs"; + "block_type" => ?Payload::block_type(), + "slot" => signed_blobs.message.beacon_block_slot.as_u64(), + "block_root" => ?signed_blobs.message.beacon_block_root, + "blobs_len" => signed_blobs.message.blobs.len(), + ); + + Ok::<_, BlockError>(()) + }; + + let (res_block, res_blob) = tokio::join!(block_publish_future, blob_publish_future); + + res_block?; + res_blob?; + } + } - info!( - log, - "Successfully published block"; - "block_type" => ?Payload::block_type(), - "deposits" => signed_block.message().body().deposits().len(), - "attestations" => signed_block.message().body().attestations().len(), - "graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()), - "slot" => signed_block.slot().as_u64(), - ); Ok(()) } } diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index 146d008a5..cc71196f4 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -13,6 +13,7 @@ pub const BEACON_BLOCK: &str = "beacon_block"; pub const BEACON_BLOCK_HTTP_GET: &str = "beacon_block_http_get"; pub const BLINDED_BEACON_BLOCK_HTTP_GET: &str = "blinded_beacon_block_http_get"; pub const BEACON_BLOCK_HTTP_POST: &str = "beacon_block_http_post"; +pub const BEACON_BLOB_HTTP_POST: &str = "beacon_blob_http_post"; pub const BLINDED_BEACON_BLOCK_HTTP_POST: &str = "blinded_beacon_block_http_post"; pub const ATTESTATIONS: &str = "attestations"; pub const ATTESTATIONS_HTTP_GET: &str = "attestations_http_get"; @@ -57,6 +58,11 @@ lazy_static::lazy_static! { "Total count of attempted block signings", &["status"] ); + pub static ref SIGNED_BLOBS_TOTAL: Result = try_create_int_counter_vec( + "vc_signed_beacon_blobs_total", + "Total count of attempted blob signings", + &["status"] + ); pub static ref SIGNED_ATTESTATIONS_TOTAL: Result = try_create_int_counter_vec( "vc_signed_attestations_total", "Total count of attempted Attestation signings", diff --git a/validator_client/src/signing_method.rs b/validator_client/src/signing_method.rs index de69d9900..36467bd17 100644 --- a/validator_client/src/signing_method.rs +++ b/validator_client/src/signing_method.rs @@ -37,6 +37,7 @@ pub enum Error { pub enum SignableMessage<'a, T: EthSpec, Payload: ExecPayload = FullPayload> { RandaoReveal(Epoch), BeaconBlock(&'a BeaconBlock), + BlobsSidecar(&'a BlobsSidecar), AttestationData(&'a AttestationData), SignedAggregateAndProof(&'a AggregateAndProof), SelectionProof(Slot), @@ -58,6 +59,7 @@ impl<'a, T: EthSpec, Payload: ExecPayload> SignableMessage<'a, T, Payload> { match self { SignableMessage::RandaoReveal(epoch) => epoch.signing_root(domain), SignableMessage::BeaconBlock(b) => b.signing_root(domain), + SignableMessage::BlobsSidecar(b) => b.signing_root(domain), SignableMessage::AttestationData(a) => a.signing_root(domain), SignableMessage::SignedAggregateAndProof(a) => a.signing_root(domain), SignableMessage::SelectionProof(slot) => slot.signing_root(domain), @@ -180,6 +182,7 @@ impl SigningMethod { Web3SignerObject::RandaoReveal { epoch } } SignableMessage::BeaconBlock(block) => Web3SignerObject::beacon_block(block)?, + SignableMessage::BlobsSidecar(blob) => Web3SignerObject::BlobsSidecar(blob), SignableMessage::AttestationData(a) => Web3SignerObject::Attestation(a), SignableMessage::SignedAggregateAndProof(a) => { Web3SignerObject::AggregateAndProof(a) diff --git a/validator_client/src/signing_method/web3signer.rs b/validator_client/src/signing_method/web3signer.rs index 0de260ecf..6668badb9 100644 --- a/validator_client/src/signing_method/web3signer.rs +++ b/validator_client/src/signing_method/web3signer.rs @@ -11,6 +11,7 @@ pub enum MessageType { AggregateAndProof, Attestation, BlockV2, + BlobsSidecar, Deposit, RandaoReveal, VoluntaryExit, @@ -50,6 +51,8 @@ pub enum Web3SignerObject<'a, T: EthSpec, Payload: ExecPayload> { #[serde(skip_serializing_if = "Option::is_none")] block_header: Option, }, + //FIXME(sean) just guessing here + BlobsSidecar(&'a BlobsSidecar), #[allow(dead_code)] Deposit { pubkey: PublicKeyBytes, @@ -105,6 +108,7 @@ impl<'a, T: EthSpec, Payload: ExecPayload> Web3SignerObject<'a, T, Payload> { Web3SignerObject::AggregateAndProof(_) => MessageType::AggregateAndProof, Web3SignerObject::Attestation(_) => MessageType::Attestation, Web3SignerObject::BeaconBlock { .. } => MessageType::BlockV2, + Web3SignerObject::BlobsSidecar(_) => MessageType::BlobsSidecar, Web3SignerObject::Deposit { .. } => MessageType::Deposit, Web3SignerObject::RandaoReveal { .. } => MessageType::RandaoReveal, Web3SignerObject::VoluntaryExit(_) => MessageType::VoluntaryExit, diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 292b49ac3..389bbb800 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -19,11 +19,12 @@ use std::sync::Arc; use task_executor::TaskExecutor; use types::{ attestation::Error as AttestationError, graffiti::GraffitiString, Address, AggregateAndProof, - Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, - EthSpec, ExecPayload, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes, SelectionProof, - Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedRoot, - SignedValidatorRegistrationData, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, - SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + Attestation, BeaconBlock, BlindedPayload, BlobsSidecar, ChainSpec, ContributionAndProof, + Domain, Epoch, EthSpec, ExecPayload, Fork, FullPayload, Graffiti, Hash256, Keypair, + PublicKeyBytes, SelectionProof, Signature, SignedAggregateAndProof, SignedBeaconBlock, + SignedBlobsSidecar, SignedContributionAndProof, SignedRoot, SignedValidatorRegistrationData, + Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, SyncCommitteeMessage, + SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, }; use validator_dir::ValidatorDir; @@ -531,6 +532,42 @@ impl ValidatorStore { } } + pub async fn sign_blobs( + &self, + validator_pubkey: PublicKeyBytes, + blobs_sidecar: BlobsSidecar, + current_slot: Slot, + ) -> Result, Error> { + let slot = blobs_sidecar.beacon_block_slot; + + // Make sure the blob slot is not higher than the current slot to avoid potential attacks. + if slot > current_slot { + warn!( + self.log, + "Not signing blob with slot greater than current slot"; + "blob_slot" => slot.as_u64(), + "current_slot" => current_slot.as_u64() + ); + return Err(Error::GreaterThanCurrentSlot { slot, current_slot }); + } + + let signing_epoch = slot.epoch(E::slots_per_epoch()); + let signing_context = self.signing_context(Domain::BlobsSideCar, signing_epoch); + + metrics::inc_counter_vec(&metrics::SIGNED_BLOBS_TOTAL, &[metrics::SUCCESS]); + + let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + let signature = signing_method + .get_signature::>( + SignableMessage::BlobsSidecar(&blobs_sidecar), + signing_context, + &self.spec, + &self.task_executor, + ) + .await?; + Ok(SignedBlobsSidecar::from_blob(blobs_sidecar, signature)) + } + pub async fn sign_attestation( &self, validator_pubkey: PublicKeyBytes,