Add more gossip verification functions for blobs

This commit is contained in:
Pawan Dhananjay 2022-10-04 19:17:51 -05:00
parent 9d99c784ea
commit 12fe514550
No known key found for this signature in database
GPG Key ID: 647E56278D7E9B4C
6 changed files with 73 additions and 16 deletions

View File

@ -6,6 +6,7 @@ use crate::attestation_verification::{
use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache; use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_verification::{BlobError, VerifiedBlobsSidecar};
use crate::block_times_cache::BlockTimesCache; use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::{ use crate::block_verification::{
check_block_is_finalized_descendant, check_block_relevancy, get_block_root, check_block_is_finalized_descendant, check_block_relevancy, get_block_root,
@ -1773,6 +1774,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}) })
} }
/// Accepts some `SignedBlobsSidecar` from the network and attempts to verify it,
/// returning `Ok(_)` if it is valid to be (re)broadcast on the gossip network.
pub fn verify_blobs_sidecar_for_gossip<'a>(
&self,
blobs_sidecar: &'a SignedBlobsSidecar<T::EthSpec>,
) -> Result<VerifiedBlobsSidecar<'a, T>, BlobError> {
metrics::inc_counter(&metrics::BLOBS_SIDECAR_PROCESSING_REQUESTS);
let _timer = metrics::start_timer(&metrics::BLOBS_SIDECAR_GOSSIP_VERIFICATION_TIMES);
VerifiedBlobsSidecar::verify(blobs_sidecar, self).map(|v| {
if let Some(_event_handler) = self.event_handler.as_ref() {
// TODO: Handle sse events
}
metrics::inc_counter(&metrics::BLOBS_SIDECAR_PROCESSING_SUCCESSES);
v
})
}
/// Accepts some attestation-type object and attempts to verify it in the context of fork /// Accepts some attestation-type object and attempts to verify it in the context of fork
/// choice. If it is valid it is applied to `self.fork_choice`. /// choice. If it is valid it is applied to `self.fork_choice`.
/// ///

View File

@ -87,13 +87,13 @@ impl From<BeaconStateError> for BlobError {
/// the p2p network. /// the p2p network.
#[derive(Derivative)] #[derive(Derivative)]
#[derivative(Debug(bound = "T: BeaconChainTypes"))] #[derivative(Debug(bound = "T: BeaconChainTypes"))]
pub struct GossipVerifiedBlobsSidecar<T: BeaconChainTypes> { pub struct VerifiedBlobsSidecar<'a, T: BeaconChainTypes> {
pub blob_sidecar: Arc<SignedBlobsSidecar<T::EthSpec>>, pub blob_sidecar: &'a SignedBlobsSidecar<T::EthSpec>,
} }
impl<T: BeaconChainTypes> GossipVerifiedBlobsSidecar<T> { impl<'a, T: BeaconChainTypes> VerifiedBlobsSidecar<'a, T> {
pub fn new( pub fn verify(
blob_sidecar: Arc<SignedBlobsSidecar<T::EthSpec>>, blob_sidecar: &'a SignedBlobsSidecar<T::EthSpec>,
chain: &BeaconChain<T>, chain: &BeaconChain<T>,
) -> Result<Self, BlobError> { ) -> Result<Self, BlobError> {
let blob_slot = blob_sidecar.message.beacon_block_slot; let blob_slot = blob_sidecar.message.beacon_block_slot;
@ -121,7 +121,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlobsSidecar<T> {
} }
// Verify that blobs are properly formatted // Verify that blobs are properly formatted
//TODO: add the check while constructing a Blob type from bytes //TODO: add the check while constructing a Blob type from bytes instead of after
for (i, blob) in blob_sidecar.message.blobs.iter().enumerate() { for (i, blob) in blob_sidecar.message.blobs.iter().enumerate() {
if blob.iter().any(|b| *b >= *BLS_MODULUS) { if blob.iter().any(|b| *b >= *BLS_MODULUS) {
return Err(BlobError::BlobOutOfRange { blob_index: i }); return Err(BlobError::BlobOutOfRange { blob_index: i });

View File

@ -5,7 +5,7 @@ mod beacon_chain;
mod beacon_fork_choice_store; mod beacon_fork_choice_store;
pub mod beacon_proposer_cache; pub mod beacon_proposer_cache;
mod beacon_snapshot; mod beacon_snapshot;
mod blob_verification; pub mod blob_verification;
pub mod block_reward; pub mod block_reward;
mod block_times_cache; mod block_times_cache;
mod block_verification; mod block_verification;

View File

@ -941,6 +941,22 @@ lazy_static! {
"beacon_pre_finalization_block_lookup_count", "beacon_pre_finalization_block_lookup_count",
"Number of block roots subject to single block lookups" "Number of block roots subject to single block lookups"
); );
/*
* Blob sidecar Verification
*/
pub static ref BLOBS_SIDECAR_PROCESSING_REQUESTS: Result<IntCounter> = try_create_int_counter(
"beacon_blobs_sidecar_processing_requests_total",
"Count of all blob sidecars submitted for processing"
);
pub static ref BLOBS_SIDECAR_PROCESSING_SUCCESSES: Result<IntCounter> = try_create_int_counter(
"beacon_blobs_sidecar_processing_successes_total",
"Number of blob sidecars verified for gossip"
);
pub static ref BLOBS_SIDECAR_GOSSIP_VERIFICATION_TIMES: Result<Histogram> = try_create_histogram(
"beacon_blobs_sidecar_gossip_verification_seconds",
"Full runtime of blob sidecars gossip verification"
);
} }
/// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot, /// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot,

View File

@ -1542,7 +1542,6 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
peer_client, peer_client,
blobs, blobs,
work_reprocessing_tx, work_reprocessing_tx,
duplicate_cache,
seen_timestamp, seen_timestamp,
) )
.await .await

View File

@ -3,6 +3,7 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::store::Error; use beacon_chain::store::Error;
use beacon_chain::{ use beacon_chain::{
attestation_verification::{self, Error as AttnError, VerifiedAttestation}, attestation_verification::{self, Error as AttnError, VerifiedAttestation},
blob_verification::BlobError,
observed_operations::ObservationOutcome, observed_operations::ObservationOutcome,
sync_committee_verification::{self, Error as SyncCommitteeError}, sync_committee_verification::{self, Error as SyncCommitteeError},
validator_monitor::get_block_delay_ms, validator_monitor::get_block_delay_ms,
@ -697,15 +698,27 @@ impl<T: BeaconChainTypes> Worker<T> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn process_gossip_blob( pub async fn process_gossip_blob(
self, self,
_message_id: MessageId, message_id: MessageId,
_peer_id: PeerId, peer_id: PeerId,
_peer_client: Client, peer_client: Client,
_blob: Arc<SignedBlobsSidecar<T::EthSpec>>, blob: Arc<SignedBlobsSidecar<T::EthSpec>>,
_reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>, reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
_duplicate_cache: DuplicateCache, seen_timestamp: Duration,
_seen_duration: Duration,
) { ) {
//FIXME(sean) match self.chain.verify_blobs_sidecar_for_gossip(&blob) {
Ok(verified_sidecar) => {
// Register with validator monitor
// Propagate
// Apply to fork choice
}
Err(error) => self.handle_blobs_verification_failure(
peer_id,
message_id,
Some(reprocess_tx),
error,
seen_timestamp,
),
};
} }
/// Process the beacon block received from the gossip network and /// Process the beacon block received from the gossip network and
@ -2212,4 +2225,15 @@ impl<T: BeaconChainTypes> Worker<T> {
self.propagate_if_timely(is_timely, message_id, peer_id) self.propagate_if_timely(is_timely, message_id, peer_id)
} }
/// Handle an error whilst verifying a `SignedBlobsSidecar` from the network.
fn handle_blobs_verification_failure(
&self,
peer_id: PeerId,
message_id: MessageId,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
error: BlobError,
seen_timestamp: Duration,
) {
}
} }