diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7418a66c1..3b3814f6c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3628,7 +3628,7 @@ impl BeaconChain { .ok_or(BlockProductionError::MissingSyncAggregate)?, execution_payload: execution_payload .ok_or(BlockProductionError::MissingExecutionPayload)?, - blob_kzg_commitments: blob_kzg_commitments.into(), + blob_kzg_commitments: todo!(), }, }), }; diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 3519dafcf..db9016b8b 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -14,6 +14,7 @@ use types::{ SignedBeaconBlockMerge, SignedBeaconBlockEip4844, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; +use types::blobs_sidecar::BlobsSidecar; use types::signed_blobs_sidecar::SignedBlobsSidecar; #[derive(Debug, Clone, PartialEq)] @@ -184,6 +185,11 @@ impl PubsubMessage { }; Ok(PubsubMessage::BeaconBlock(Arc::new(beacon_block))) } + GossipKind::BlobsSidecar => { + let blobs_sidecar = SignedBlobsSidecar::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::BlobsSidecars(Arc::new(blobs_sidecar))) + } GossipKind::VoluntaryExit => { let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; @@ -228,6 +234,7 @@ impl PubsubMessage { // messages for us. match &self { PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(), + PubsubMessage::BlobsSidecars(data) => data.as_ssz_bytes(), PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(), PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), @@ -248,6 +255,12 @@ impl std::fmt::Display for PubsubMessage { block.slot(), block.message().proposer_index() ), + PubsubMessage::BlobsSidecars(blobs) => write!( + f, + "Blobs Sidecar: slot: {}, blobs: {}", + blobs.message.beacon_block_slot, + blobs.message.blobs.len(), + ), PubsubMessage::AggregateAndProofAttestation(att) => write!( f, "Aggregate and Proof: slot: {}, index: {}, aggregator_index: {}", diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 901a193e3..3e7726480 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -182,6 +182,7 @@ impl From for String { let kind = match topic.kind { GossipKind::BeaconBlock => BEACON_BLOCK_TOPIC.into(), + GossipKind::BlobsSidecar => BLOBS_SIDECAR_TOPIC.into(), GossipKind::BeaconAggregateAndProof => BEACON_AGGREGATE_AND_PROOF_TOPIC.into(), GossipKind::VoluntaryExit => VOLUNTARY_EXIT_TOPIC.into(), GossipKind::ProposerSlashing => PROPOSER_SLASHING_TOPIC.into(), @@ -210,6 +211,7 @@ impl std::fmt::Display for GossipTopic { let kind = match self.kind { GossipKind::BeaconBlock => BEACON_BLOCK_TOPIC.into(), + GossipKind::BlobsSidecar => BLOBS_SIDECAR_TOPIC.into(), GossipKind::BeaconAggregateAndProof => BEACON_AGGREGATE_AND_PROOF_TOPIC.into(), GossipKind::VoluntaryExit => VOLUNTARY_EXIT_TOPIC.into(), GossipKind::ProposerSlashing => PROPOSER_SLASHING_TOPIC.into(), diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index e9a115904..2a811b805 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -65,6 +65,7 @@ use types::{ SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; +use types::signed_blobs_sidecar::SignedBlobsSidecar; use work_reprocessing_queue::{ spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, }; @@ -184,6 +185,7 @@ pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch"; pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch"; pub const GOSSIP_BLOCK: &str = "gossip_block"; +pub const GOSSIP_BLOBS_SIDECAR: &str = "gossip_blobs_sidecar"; pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing"; @@ -400,6 +402,26 @@ impl WorkEvent { } } + /// Create a new `Work` event for some blobs sidecar. + pub fn gossip_blobs_sidecar( + message_id: MessageId, + peer_id: PeerId, + peer_client: Client, + blobs: Arc>, + seen_timestamp: Duration, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::GossipBlobsSidecar { + message_id, + peer_id, + peer_client, + blobs, + seen_timestamp, + }, + } + } + /// Create a new `Work` event for some sync committee signature. pub fn gossip_sync_signature( message_id: MessageId, @@ -671,6 +693,13 @@ pub enum Work { block: Arc>, seen_timestamp: Duration, }, + GossipBlobsSidecar { + message_id: MessageId, + peer_id: PeerId, + peer_client: Client, + blobs: Arc>, + seen_timestamp: Duration, + }, DelayedImportBlock { peer_id: PeerId, block: Box>, @@ -739,6 +768,7 @@ impl Work { Work::GossipAggregate { .. } => GOSSIP_AGGREGATE, Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH, Work::GossipBlock { .. } => GOSSIP_BLOCK, + Work::GossipBlobsSidecar { .. } => GOSSIP_BLOBS_SIDECAR, Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT, Work::GossipProposerSlashing { .. } => GOSSIP_PROPOSER_SLASHING, @@ -888,6 +918,7 @@ impl BeaconProcessor { let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); + let mut gossip_blobs_sidecar_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN); let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); @@ -1199,6 +1230,9 @@ impl BeaconProcessor { Work::GossipBlock { .. } => { gossip_block_queue.push(work, work_id, &self.log) } + Work::GossipBlobsSidecar { .. } => { + gossip_blobs_sidecar_queue.push(work, work_id, &self.log) + } Work::DelayedImportBlock { .. } => { delayed_block_queue.push(work, work_id, &self.log) } @@ -1451,6 +1485,28 @@ impl BeaconProcessor { ) .await }), + /* + * Verification for blobs sidecars received on gossip. + */ + Work::GossipBlobsSidecar { + message_id, + peer_id, + peer_client, + blobs, + seen_timestamp, + } => task_spawner.spawn_async(async move { + worker + .process_gossip_blobs_sidecar( + message_id, + peer_id, + peer_client, + blobs, + work_reprocessing_tx, + duplicate_cache, + seen_timestamp, + ) + .await + }), /* * Import for blocks that we received earlier than their intended slot. */ diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 93ed1b463..0e1ab697e 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -22,6 +22,7 @@ use types::{ SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; +use types::signed_blobs_sidecar::SignedBlobsSidecar; use super::{ super::work_reprocessing_queue::{ @@ -987,6 +988,19 @@ impl Worker { }; } + pub async fn process_gossip_blobs_sidecar( + self, + message_id: MessageId, + peer_id: PeerId, + peer_client: Client, + blobs: Arc>, + reprocess_tx: mpsc::Sender>, + duplicate_cache: DuplicateCache, + seen_duration: Duration, + ) { + + } + pub fn process_gossip_voluntary_exit( self, message_id: MessageId, diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 03b877506..0c9b41779 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -229,6 +229,14 @@ impl Router { block, ); } + PubsubMessage::BlobsSidecars(blobs) => { + self.processor.on_blobs_gossip( + id, + peer_id, + self.network_globals.client(&peer_id), + blobs, + ); + } PubsubMessage::VoluntaryExit(exit) => { debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id); self.processor.on_voluntary_exit_gossip(id, peer_id, exit); diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index ce11cbdce..c716707f3 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -20,6 +20,7 @@ use types::{ Attestation, AttesterSlashing, EthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId, }; +use types::signed_blobs_sidecar::SignedBlobsSidecar; /// Processes validated messages from the network. It relays necessary data to the syncing thread /// and processes blocks from the pubsub network. @@ -255,6 +256,22 @@ impl Processor { )) } + pub fn on_blobs_gossip( + &mut self, + message_id: MessageId, + peer_id: PeerId, + peer_client: Client, + blobs: Arc>, + ) { + self.send_beacon_processor_work(BeaconWorkEvent::gossip_blobs_sidecar( + message_id, + peer_id, + peer_client, + blobs, + timestamp_now(), + )) + } + pub fn on_unaggregated_attestation_gossip( &mut self, message_id: MessageId, diff --git a/consensus/types/src/blobs_sidecar.rs b/consensus/types/src/blobs_sidecar.rs index 4d39c1af8..445c731cf 100644 --- a/consensus/types/src/blobs_sidecar.rs +++ b/consensus/types/src/blobs_sidecar.rs @@ -9,8 +9,8 @@ use derivative::Derivative; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Encode, Decode, TreeHash, Derivative)] pub struct BlobsSidecar { - beacon_block_root: Hash256, - beacon_block_slot: Slot, - blobs: VariableList, T::MaxBlobsPerBlock>, - kzg_aggregate_proof: KzgProof, + pub beacon_block_root: Hash256, + pub beacon_block_slot: Slot, + pub blobs: VariableList, T::MaxBlobsPerBlock>, + pub kzg_aggregate_proof: KzgProof, } \ No newline at end of file