diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 05c678b25..a80765518 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4705,6 +4705,14 @@ fn publish_pubsub_message( ) } +/// Publish a message to the libp2p pubsub network. +fn publish_pubsub_messages( + network_tx: &UnboundedSender>, + messages: Vec>, +) -> Result<(), warp::Rejection> { + publish_network_message(network_tx, NetworkMessage::Publish { messages }) +} + /// Publish a message to the libp2p network. fn publish_network_message( network_tx: &UnboundedSender>, diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index e68691ce8..e41cf51ec 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -85,19 +85,17 @@ pub async fn publish_block { - crate::publish_pubsub_message(&sender, PubsubMessage::BeaconBlock(block.clone())) - .map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?; + let mut pubsub_messages = vec![PubsubMessage::BeaconBlock(block.clone())]; if let Some(signed_blobs) = blobs_opt { for (blob_index, blob) in signed_blobs.into_iter().enumerate() { - crate::publish_pubsub_message( - &sender, - PubsubMessage::BlobSidecar(Box::new((blob_index as u64, blob))), - ) - .map_err(|_| { - BlockError::BeaconChainError(BeaconChainError::UnableToPublish) - })?; + pubsub_messages.push(PubsubMessage::BlobSidecar(Box::new(( + blob_index as u64, + blob, + )))); } } + crate::publish_pubsub_messages(&sender, pubsub_messages) + .map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?; } }; Ok(()) diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index e57fd24f2..799953538 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -285,13 +285,22 @@ lazy_static! { */ pub static ref BEACON_BLOB_GOSSIP_PROPAGATION_VERIFICATION_DELAY_TIME: Result = try_create_histogram_with_buckets( "beacon_blob_gossip_propagation_verification_delay_time", - "Duration between when the blob is received and when it is verified for propagation.", + "Duration between when the blob is received over gossip and when it is verified for propagation.", // [0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5] decimal_buckets(-3,-1) ); pub static ref BEACON_BLOB_GOSSIP_SLOT_START_DELAY_TIME: Result = try_create_histogram_with_buckets( "beacon_blob_gossip_slot_start_delay_time", - "Duration between when the blob is received and the start of the slot it belongs to.", + "Duration between when the blob is received over gossip and the start of the slot it belongs to.", + // Create a custom bucket list for greater granularity in block delay + Ok(vec![0.1, 0.2, 0.3,0.4,0.5,0.75,1.0,1.25,1.5,1.75,2.0,2.5,3.0,3.5,4.0,5.0,6.0,7.0,8.0,9.0,10.0,15.0,20.0]) + // NOTE: Previous values, which we may want to switch back to. + // [0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50] + //decimal_buckets(-1,2) + ); + pub static ref BEACON_BLOB_RPC_SLOT_START_DELAY_TIME: Result = try_create_histogram_with_buckets( + "beacon_blob_rpc_slot_start_delay_time", + "Duration between when a blob is received over rpc and the start of the slot it belongs to.", // Create a custom bucket list for greater granularity in block delay Ok(vec![0.1, 0.2, 0.3,0.4,0.5,0.75,1.0,1.25,1.5,1.75,2.0,2.5,3.0,3.5,4.0,5.0,6.0,7.0,8.0,9.0,10.0,15.0,20.0]) // NOTE: Previous values, which we may want to switch back to. diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index d6bb7421e..9ecc21c9e 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -9,7 +9,8 @@ use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::MaybeAvailableBlock; use beacon_chain::{ - observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms, + observed_block_producers::Error as ObserveError, + validator_monitor::{get_block_delay_ms, get_slot_delay_ms}, AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, }; @@ -277,7 +278,7 @@ impl NetworkBeaconProcessor { self: Arc>, block_root: Hash256, blobs: FixedBlobSidecarList, - _seen_timestamp: Duration, + seen_timestamp: Duration, process_type: BlockProcessType, ) { let Some(slot) = blobs @@ -287,8 +288,61 @@ impl NetworkBeaconProcessor { return; }; + let indices: Vec<_> = blobs + .iter() + .filter_map(|blob_opt| blob_opt.as_ref().map(|blob| blob.index)) + .collect(); + + debug!( + self.log, + "RPC blobs received"; + "indices" => ?indices, + "block_root" => %block_root, + "slot" => %slot, + ); + + if let Ok(current_slot) = self.chain.slot() { + if current_slot == slot { + // Note: this metric is useful to gauge how long it takes to receive blobs requested + // over rpc. Since we always send the request for block components at `slot_clock.single_lookup_delay()` + // we can use that as a baseline to measure against. + let delay = get_slot_delay_ms(seen_timestamp, slot, &self.chain.slot_clock); + + metrics::observe_duration(&metrics::BEACON_BLOB_RPC_SLOT_START_DELAY_TIME, delay); + } + } + let result = self.chain.process_rpc_blobs(slot, block_root, blobs).await; + match &result { + Ok(AvailabilityProcessingStatus::Imported(hash)) => { + debug!( + self.log, + "Block components retrieved"; + "result" => "imported block and blobs", + "slot" => %slot, + "block_hash" => %hash, + ); + } + Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { + warn!( + self.log, + "Missing components over rpc"; + "block_hash" => %block_root, + "slot" => %slot, + ); + } + Err(e) => { + warn!( + self.log, + "Error when importing rpc blobs"; + "error" => ?e, + "block_hash" => %block_root, + "slot" => %slot, + ); + } + } + // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type,