diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f43f0403c..f8b9dde98 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6,6 +6,7 @@ use crate::attestation_verification::{ use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; +use crate::blob_verification::{BlobError, VerifiedBlobsSidecar}; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::{ check_block_is_finalized_descendant, check_block_relevancy, get_block_root, @@ -1773,6 +1774,23 @@ impl BeaconChain { }) } + /// Accepts some `SignedBlobsSidecar` from the network and attempts to verify it, + /// returning `Ok(_)` if it is valid to be (re)broadcast on the gossip network. + pub fn verify_blobs_sidecar_for_gossip<'a>( + &self, + blobs_sidecar: &'a SignedBlobsSidecar, + ) -> Result, BlobError> { + metrics::inc_counter(&metrics::BLOBS_SIDECAR_PROCESSING_REQUESTS); + let _timer = metrics::start_timer(&metrics::BLOBS_SIDECAR_GOSSIP_VERIFICATION_TIMES); + VerifiedBlobsSidecar::verify(blobs_sidecar, self).map(|v| { + if let Some(_event_handler) = self.event_handler.as_ref() { + // TODO: Handle sse events + } + metrics::inc_counter(&metrics::BLOBS_SIDECAR_PROCESSING_SUCCESSES); + v + }) + } + /// Accepts some attestation-type object and attempts to verify it in the context of fork /// choice. If it is valid it is applied to `self.fork_choice`. /// diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 6d06cbcda..4d1627567 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -87,13 +87,13 @@ impl From for BlobError { /// the p2p network. #[derive(Derivative)] #[derivative(Debug(bound = "T: BeaconChainTypes"))] -pub struct GossipVerifiedBlobsSidecar { - pub blob_sidecar: Arc>, +pub struct VerifiedBlobsSidecar<'a, T: BeaconChainTypes> { + pub blob_sidecar: &'a SignedBlobsSidecar, } -impl GossipVerifiedBlobsSidecar { - pub fn new( - blob_sidecar: Arc>, +impl<'a, T: BeaconChainTypes> VerifiedBlobsSidecar<'a, T> { + pub fn verify( + blob_sidecar: &'a SignedBlobsSidecar, chain: &BeaconChain, ) -> Result { let blob_slot = blob_sidecar.message.beacon_block_slot; @@ -121,7 +121,7 @@ impl GossipVerifiedBlobsSidecar { } // Verify that blobs are properly formatted - //TODO: add the check while constructing a Blob type from bytes + //TODO: add the check while constructing a Blob type from bytes instead of after for (i, blob) in blob_sidecar.message.blobs.iter().enumerate() { if blob.iter().any(|b| *b >= *BLS_MODULUS) { return Err(BlobError::BlobOutOfRange { blob_index: i }); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 84e15ead7..ba83047f5 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -5,7 +5,7 @@ mod beacon_chain; mod beacon_fork_choice_store; pub mod beacon_proposer_cache; mod beacon_snapshot; -mod blob_verification; +pub mod blob_verification; pub mod block_reward; mod block_times_cache; mod block_verification; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index ead4a5402..f8accec14 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -941,6 +941,22 @@ lazy_static! { "beacon_pre_finalization_block_lookup_count", "Number of block roots subject to single block lookups" ); + + /* + * Blob sidecar Verification + */ + pub static ref BLOBS_SIDECAR_PROCESSING_REQUESTS: Result = try_create_int_counter( + "beacon_blobs_sidecar_processing_requests_total", + "Count of all blob sidecars submitted for processing" + ); + pub static ref BLOBS_SIDECAR_PROCESSING_SUCCESSES: Result = try_create_int_counter( + "beacon_blobs_sidecar_processing_successes_total", + "Number of blob sidecars verified for gossip" + ); + pub static ref BLOBS_SIDECAR_GOSSIP_VERIFICATION_TIMES: Result = try_create_histogram( + "beacon_blobs_sidecar_gossip_verification_seconds", + "Full runtime of blob sidecars gossip verification" + ); } /// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot, diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 7d7f6602e..947c215b3 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -1542,7 +1542,6 @@ impl BeaconProcessor { peer_client, blobs, work_reprocessing_tx, - duplicate_cache, seen_timestamp, ) .await diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index e5b0d76da..37c5f8c77 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -3,6 +3,7 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::store::Error; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, + blob_verification::BlobError, observed_operations::ObservationOutcome, sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::get_block_delay_ms, @@ -697,15 +698,27 @@ impl Worker { #[allow(clippy::too_many_arguments)] pub async fn process_gossip_blob( self, - _message_id: MessageId, - _peer_id: PeerId, - _peer_client: Client, - _blob: Arc>, - _reprocess_tx: mpsc::Sender>, - _duplicate_cache: DuplicateCache, - _seen_duration: Duration, + message_id: MessageId, + peer_id: PeerId, + peer_client: Client, + blob: Arc>, + reprocess_tx: mpsc::Sender>, + seen_timestamp: Duration, ) { - //FIXME(sean) + match self.chain.verify_blobs_sidecar_for_gossip(&blob) { + Ok(verified_sidecar) => { + // Register with validator monitor + // Propagate + // Apply to fork choice + } + Err(error) => self.handle_blobs_verification_failure( + peer_id, + message_id, + Some(reprocess_tx), + error, + seen_timestamp, + ), + }; } /// Process the beacon block received from the gossip network and @@ -2212,4 +2225,15 @@ impl Worker { self.propagate_if_timely(is_timely, message_id, peer_id) } + + /// Handle an error whilst verifying a `SignedBlobsSidecar` from the network. + fn handle_blobs_verification_failure( + &self, + peer_id: PeerId, + message_id: MessageId, + reprocess_tx: Option>>, + error: BlobError, + seen_timestamp: Duration, + ) { + } }