Modify lighthouse_network gossip types to free the blobs (#4064)

* Modify blob topics

* add signedblol type

pubsun messages are signed

* improve subnet topic index

* improve display code

* fix parse code

---------

Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com>
This commit is contained in:
Divma 2023-03-10 05:53:36 -05:00 committed by GitHub
parent 545532a883
commit 3898cf7be8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 74 additions and 48 deletions

View File

@ -21,7 +21,7 @@ pub struct GossipCache {
/// Timeout for blocks. /// Timeout for blocks.
beacon_block: Option<Duration>, beacon_block: Option<Duration>,
/// Timeout for blobs. /// Timeout for blobs.
beacon_block_and_blobs_sidecar: Option<Duration>, blob_sidecar: Option<Duration>,
/// Timeout for aggregate attestations. /// Timeout for aggregate attestations.
aggregates: Option<Duration>, aggregates: Option<Duration>,
/// Timeout for attestations. /// Timeout for attestations.
@ -50,7 +50,7 @@ pub struct GossipCacheBuilder {
/// Timeout for blocks. /// Timeout for blocks.
beacon_block: Option<Duration>, beacon_block: Option<Duration>,
/// Timeout for blob sidecars. /// Timeout for blob sidecars.
beacon_block_and_blobs_sidecar: Option<Duration>, blob_sidecar: Option<Duration>,
/// Timeout for aggregate attestations. /// Timeout for aggregate attestations.
aggregates: Option<Duration>, aggregates: Option<Duration>,
/// Timeout for attestations. /// Timeout for attestations.
@ -151,7 +151,7 @@ impl GossipCacheBuilder {
let GossipCacheBuilder { let GossipCacheBuilder {
default_timeout, default_timeout,
beacon_block, beacon_block,
beacon_block_and_blobs_sidecar, blob_sidecar,
aggregates, aggregates,
attestation, attestation,
voluntary_exit, voluntary_exit,
@ -167,7 +167,7 @@ impl GossipCacheBuilder {
expirations: DelayQueue::default(), expirations: DelayQueue::default(),
topic_msgs: HashMap::default(), topic_msgs: HashMap::default(),
beacon_block: beacon_block.or(default_timeout), beacon_block: beacon_block.or(default_timeout),
beacon_block_and_blobs_sidecar: beacon_block_and_blobs_sidecar.or(default_timeout), blob_sidecar: blob_sidecar.or(default_timeout),
aggregates: aggregates.or(default_timeout), aggregates: aggregates.or(default_timeout),
attestation: attestation.or(default_timeout), attestation: attestation.or(default_timeout),
voluntary_exit: voluntary_exit.or(default_timeout), voluntary_exit: voluntary_exit.or(default_timeout),
@ -193,7 +193,7 @@ impl GossipCache {
pub fn insert(&mut self, topic: GossipTopic, data: Vec<u8>) { pub fn insert(&mut self, topic: GossipTopic, data: Vec<u8>) {
let expire_timeout = match topic.kind() { let expire_timeout = match topic.kind() {
GossipKind::BeaconBlock => self.beacon_block, GossipKind::BeaconBlock => self.beacon_block,
GossipKind::BeaconBlocksAndBlobsSidecar => self.beacon_block_and_blobs_sidecar, GossipKind::BlobSidecar(_) => self.blob_sidecar,
GossipKind::BeaconAggregateAndProof => self.aggregates, GossipKind::BeaconAggregateAndProof => self.aggregates,
GossipKind::Attestation(_) => self.attestation, GossipKind::Attestation(_) => self.attestation,
GossipKind::VoluntaryExit => self.voluntary_exit, GossipKind::VoluntaryExit => self.voluntary_exit,

View File

@ -235,6 +235,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
possible_fork_digests, possible_fork_digests,
ctx.chain_spec.attestation_subnet_count, ctx.chain_spec.attestation_subnet_count,
SYNC_COMMITTEE_SUBNET_COUNT, SYNC_COMMITTEE_SUBNET_COUNT,
4, // TODO(pawan): get this from chainspec
), ),
max_subscribed_topics: 200, max_subscribed_topics: 200,
max_subscriptions_per_request: 150, // 148 in theory = (64 attestation + 4 sync committee + 6 core topics) * 2 max_subscriptions_per_request: 150, // 148 in theory = (64 attestation + 4 sync committee + 6 core topics) * 2

View File

@ -236,6 +236,7 @@ pub(crate) fn create_whitelist_filter(
possible_fork_digests: Vec<[u8; 4]>, possible_fork_digests: Vec<[u8; 4]>,
attestation_subnet_count: u64, attestation_subnet_count: u64,
sync_committee_subnet_count: u64, sync_committee_subnet_count: u64,
blob_sidecar_subnet_count: u64,
) -> WhitelistSubscriptionFilter { ) -> WhitelistSubscriptionFilter {
let mut possible_hashes = HashSet::new(); let mut possible_hashes = HashSet::new();
for fork_digest in possible_fork_digests { for fork_digest in possible_fork_digests {
@ -255,13 +256,15 @@ pub(crate) fn create_whitelist_filter(
add(BlsToExecutionChange); add(BlsToExecutionChange);
add(LightClientFinalityUpdate); add(LightClientFinalityUpdate);
add(LightClientOptimisticUpdate); add(LightClientOptimisticUpdate);
add(BeaconBlocksAndBlobsSidecar);
for id in 0..attestation_subnet_count { for id in 0..attestation_subnet_count {
add(Attestation(SubnetId::new(id))); add(Attestation(SubnetId::new(id)));
} }
for id in 0..sync_committee_subnet_count { for id in 0..sync_committee_subnet_count {
add(SyncCommitteeMessage(SyncSubnetId::new(id))); add(SyncCommitteeMessage(SyncSubnetId::new(id)));
} }
for id in 0..blob_sidecar_subnet_count {
add(BlobSidecar(id));
}
} }
WhitelistSubscriptionFilter(possible_hashes) WhitelistSubscriptionFilter(possible_hashes)
} }

View File

@ -11,8 +11,8 @@ use std::sync::Arc;
use types::{ use types::{
Attestation, AttesterSlashing, EthSpec, ForkContext, ForkName, LightClientFinalityUpdate, Attestation, AttesterSlashing, EthSpec, ForkContext, ForkName, LightClientFinalityUpdate,
LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
SignedBeaconBlockAltair, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockBase, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockCapella,
SignedBeaconBlockCapella, SignedBeaconBlockMerge, SignedBlsToExecutionChange, SignedBeaconBlockMerge, SignedBlobSidecar, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId,
}; };
@ -20,8 +20,8 @@ use types::{
pub enum PubsubMessage<T: EthSpec> { pub enum PubsubMessage<T: EthSpec> {
/// Gossipsub message providing notification of a new block. /// Gossipsub message providing notification of a new block.
BeaconBlock(Arc<SignedBeaconBlock<T>>), BeaconBlock(Arc<SignedBeaconBlock<T>>),
/// Gossipsub message providing notification of a new SignedBeaconBlock coupled with a blobs sidecar. /// Gossipsub message providing notification of a [`SignedBlobSidecar`] along with the subnet id where it was received.
BeaconBlockAndBlobsSidecars(SignedBeaconBlockAndBlobsSidecar<T>), BlobSidecar(Box<(u64, SignedBlobSidecar<T>)>),
/// Gossipsub message providing notification of a Aggregate attestation and associated proof. /// Gossipsub message providing notification of a Aggregate attestation and associated proof.
AggregateAndProofAttestation(Box<SignedAggregateAndProof<T>>), AggregateAndProofAttestation(Box<SignedAggregateAndProof<T>>),
/// Gossipsub message providing notification of a raw un-aggregated attestation with its shard id. /// Gossipsub message providing notification of a raw un-aggregated attestation with its shard id.
@ -115,8 +115,8 @@ impl<T: EthSpec> PubsubMessage<T> {
pub fn kind(&self) -> GossipKind { pub fn kind(&self) -> GossipKind {
match self { match self {
PubsubMessage::BeaconBlock(_) => GossipKind::BeaconBlock, PubsubMessage::BeaconBlock(_) => GossipKind::BeaconBlock,
PubsubMessage::BeaconBlockAndBlobsSidecars(_) => { PubsubMessage::BlobSidecar(blob_sidecar_data) => {
GossipKind::BeaconBlocksAndBlobsSidecar GossipKind::BlobSidecar(blob_sidecar_data.0)
} }
PubsubMessage::AggregateAndProofAttestation(_) => GossipKind::BeaconAggregateAndProof, PubsubMessage::AggregateAndProofAttestation(_) => GossipKind::BeaconAggregateAndProof,
PubsubMessage::Attestation(attestation_data) => { PubsubMessage::Attestation(attestation_data) => {
@ -203,15 +203,15 @@ impl<T: EthSpec> PubsubMessage<T> {
}; };
Ok(PubsubMessage::BeaconBlock(Arc::new(beacon_block))) Ok(PubsubMessage::BeaconBlock(Arc::new(beacon_block)))
} }
GossipKind::BeaconBlocksAndBlobsSidecar => { GossipKind::BlobSidecar(blob_index) => {
match fork_context.from_context_bytes(gossip_topic.fork_digest) { match fork_context.from_context_bytes(gossip_topic.fork_digest) {
Some(ForkName::Eip4844) => { Some(ForkName::Eip4844) => {
let block_and_blobs_sidecar = let blob_sidecar = SignedBlobSidecar::from_ssz_bytes(data)
SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?;
.map_err(|e| format!("{:?}", e))?; Ok(PubsubMessage::BlobSidecar(Box::new((
Ok(PubsubMessage::BeaconBlockAndBlobsSidecars( *blob_index,
block_and_blobs_sidecar, blob_sidecar,
)) ))))
} }
Some( Some(
ForkName::Base ForkName::Base
@ -293,7 +293,7 @@ impl<T: EthSpec> PubsubMessage<T> {
// messages for us. // messages for us.
match &self { match &self {
PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(), PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(),
PubsubMessage::BeaconBlockAndBlobsSidecars(data) => data.as_ssz_bytes(), PubsubMessage::BlobSidecar(data) => data.1.as_ssz_bytes(),
PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(),
PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(), PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(),
PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(),
@ -317,11 +317,10 @@ impl<T: EthSpec> std::fmt::Display for PubsubMessage<T> {
block.slot(), block.slot(),
block.message().proposer_index() block.message().proposer_index()
), ),
PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blob) => write!( PubsubMessage::BlobSidecar(data) => write!(
f, f,
"Beacon block and Blobs Sidecar: slot: {}, blobs: {}", "BlobSidecar: slot: {}, blob index: {}",
block_and_blob.beacon_block.message().slot(), data.1.blob.slot, data.1.blob.index,
block_and_blob.blobs_sidecar.blobs.len(),
), ),
PubsubMessage::AggregateAndProofAttestation(att) => write!( PubsubMessage::AggregateAndProofAttestation(att) => write!(
f, f,

View File

@ -11,9 +11,9 @@ use crate::Subnet;
pub const TOPIC_PREFIX: &str = "eth2"; pub const TOPIC_PREFIX: &str = "eth2";
pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy"; pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy";
pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; pub const BEACON_BLOCK_TOPIC: &str = "beacon_block";
pub const BEACON_BLOCK_AND_BLOBS_SIDECAR_TOPIC: &str = "beacon_block_and_blobs_sidecar";
pub const BEACON_AGGREGATE_AND_PROOF_TOPIC: &str = "beacon_aggregate_and_proof"; pub const BEACON_AGGREGATE_AND_PROOF_TOPIC: &str = "beacon_aggregate_and_proof";
pub const BEACON_ATTESTATION_PREFIX: &str = "beacon_attestation_"; pub const BEACON_ATTESTATION_PREFIX: &str = "beacon_attestation_";
pub const BLOB_SIDECAR_PREFIX: &str = "blob_sidecar_";
pub const VOLUNTARY_EXIT_TOPIC: &str = "voluntary_exit"; pub const VOLUNTARY_EXIT_TOPIC: &str = "voluntary_exit";
pub const PROPOSER_SLASHING_TOPIC: &str = "proposer_slashing"; pub const PROPOSER_SLASHING_TOPIC: &str = "proposer_slashing";
pub const ATTESTER_SLASHING_TOPIC: &str = "attester_slashing"; pub const ATTESTER_SLASHING_TOPIC: &str = "attester_slashing";
@ -82,10 +82,10 @@ pub struct GossipTopic {
pub enum GossipKind { pub enum GossipKind {
/// Topic for publishing beacon blocks. /// Topic for publishing beacon blocks.
BeaconBlock, BeaconBlock,
/// Topic for publishing beacon block coupled with blob sidecars.
BeaconBlocksAndBlobsSidecar,
/// Topic for publishing aggregate attestations and proofs. /// Topic for publishing aggregate attestations and proofs.
BeaconAggregateAndProof, BeaconAggregateAndProof,
/// Topic for publishing BlobSidecars.
BlobSidecar(u64),
/// Topic for publishing raw attestations on a particular subnet. /// Topic for publishing raw attestations on a particular subnet.
#[strum(serialize = "beacon_attestation")] #[strum(serialize = "beacon_attestation")]
Attestation(SubnetId), Attestation(SubnetId),
@ -115,6 +115,9 @@ impl std::fmt::Display for GossipKind {
GossipKind::SyncCommitteeMessage(subnet_id) => { GossipKind::SyncCommitteeMessage(subnet_id) => {
write!(f, "sync_committee_{}", **subnet_id) write!(f, "sync_committee_{}", **subnet_id)
} }
GossipKind::BlobSidecar(blob_index) => {
write!(f, "{}{}", BLOB_SIDECAR_PREFIX, blob_index)
}
x => f.write_str(x.as_ref()), x => f.write_str(x.as_ref()),
} }
} }
@ -175,7 +178,6 @@ impl GossipTopic {
let kind = match topic_parts[3] { let kind = match topic_parts[3] {
BEACON_BLOCK_TOPIC => GossipKind::BeaconBlock, BEACON_BLOCK_TOPIC => GossipKind::BeaconBlock,
BEACON_AGGREGATE_AND_PROOF_TOPIC => GossipKind::BeaconAggregateAndProof, BEACON_AGGREGATE_AND_PROOF_TOPIC => GossipKind::BeaconAggregateAndProof,
BEACON_BLOCK_AND_BLOBS_SIDECAR_TOPIC => GossipKind::BeaconBlocksAndBlobsSidecar,
SIGNED_CONTRIBUTION_AND_PROOF_TOPIC => GossipKind::SignedContributionAndProof, SIGNED_CONTRIBUTION_AND_PROOF_TOPIC => GossipKind::SignedContributionAndProof,
VOLUNTARY_EXIT_TOPIC => GossipKind::VoluntaryExit, VOLUNTARY_EXIT_TOPIC => GossipKind::VoluntaryExit,
PROPOSER_SLASHING_TOPIC => GossipKind::ProposerSlashing, PROPOSER_SLASHING_TOPIC => GossipKind::ProposerSlashing,
@ -183,11 +185,8 @@ impl GossipTopic {
BLS_TO_EXECUTION_CHANGE_TOPIC => GossipKind::BlsToExecutionChange, BLS_TO_EXECUTION_CHANGE_TOPIC => GossipKind::BlsToExecutionChange,
LIGHT_CLIENT_FINALITY_UPDATE => GossipKind::LightClientFinalityUpdate, LIGHT_CLIENT_FINALITY_UPDATE => GossipKind::LightClientFinalityUpdate,
LIGHT_CLIENT_OPTIMISTIC_UPDATE => GossipKind::LightClientOptimisticUpdate, LIGHT_CLIENT_OPTIMISTIC_UPDATE => GossipKind::LightClientOptimisticUpdate,
topic => match committee_topic_index(topic) { topic => match subnet_topic_index(topic) {
Some(subnet) => match subnet { Some(kind) => kind,
Subnet::Attestation(s) => GossipKind::Attestation(s),
Subnet::SyncCommittee(s) => GossipKind::SyncCommitteeMessage(s),
},
None => return Err(format!("Unknown topic: {}", topic)), None => return Err(format!("Unknown topic: {}", topic)),
}, },
}; };
@ -232,7 +231,6 @@ impl std::fmt::Display for GossipTopic {
let kind = match self.kind { let kind = match self.kind {
GossipKind::BeaconBlock => BEACON_BLOCK_TOPIC.into(), GossipKind::BeaconBlock => BEACON_BLOCK_TOPIC.into(),
GossipKind::BeaconBlocksAndBlobsSidecar => BEACON_BLOCK_AND_BLOBS_SIDECAR_TOPIC.into(),
GossipKind::BeaconAggregateAndProof => BEACON_AGGREGATE_AND_PROOF_TOPIC.into(), GossipKind::BeaconAggregateAndProof => BEACON_AGGREGATE_AND_PROOF_TOPIC.into(),
GossipKind::VoluntaryExit => VOLUNTARY_EXIT_TOPIC.into(), GossipKind::VoluntaryExit => VOLUNTARY_EXIT_TOPIC.into(),
GossipKind::ProposerSlashing => PROPOSER_SLASHING_TOPIC.into(), GossipKind::ProposerSlashing => PROPOSER_SLASHING_TOPIC.into(),
@ -242,6 +240,9 @@ impl std::fmt::Display for GossipTopic {
GossipKind::SyncCommitteeMessage(index) => { GossipKind::SyncCommitteeMessage(index) => {
format!("{}{}", SYNC_COMMITTEE_PREFIX_TOPIC, *index) format!("{}{}", SYNC_COMMITTEE_PREFIX_TOPIC, *index)
} }
GossipKind::BlobSidecar(blob_index) => {
format!("{}{}", BLOB_SIDECAR_PREFIX, blob_index)
}
GossipKind::BlsToExecutionChange => BLS_TO_EXECUTION_CHANGE_TOPIC.into(), GossipKind::BlsToExecutionChange => BLS_TO_EXECUTION_CHANGE_TOPIC.into(),
GossipKind::LightClientFinalityUpdate => LIGHT_CLIENT_FINALITY_UPDATE.into(), GossipKind::LightClientFinalityUpdate => LIGHT_CLIENT_FINALITY_UPDATE.into(),
GossipKind::LightClientOptimisticUpdate => LIGHT_CLIENT_OPTIMISTIC_UPDATE.into(), GossipKind::LightClientOptimisticUpdate => LIGHT_CLIENT_OPTIMISTIC_UPDATE.into(),
@ -273,22 +274,18 @@ pub fn subnet_from_topic_hash(topic_hash: &TopicHash) -> Option<Subnet> {
GossipTopic::decode(topic_hash.as_str()).ok()?.subnet_id() GossipTopic::decode(topic_hash.as_str()).ok()?.subnet_id()
} }
// Determines if a string is an attestation or sync committee topic. // Determines if the topic name is of an indexed topic.
fn committee_topic_index(topic: &str) -> Option<Subnet> { fn subnet_topic_index(topic: &str) -> Option<GossipKind> {
if topic.starts_with(BEACON_ATTESTATION_PREFIX) { if let Some(index) = topic.strip_prefix(BEACON_ATTESTATION_PREFIX) {
return Some(Subnet::Attestation(SubnetId::new( return Some(GossipKind::Attestation(SubnetId::new(
topic index.parse::<u64>().ok()?,
.trim_start_matches(BEACON_ATTESTATION_PREFIX)
.parse::<u64>()
.ok()?,
))); )));
} else if topic.starts_with(SYNC_COMMITTEE_PREFIX_TOPIC) { } else if let Some(index) = topic.strip_prefix(SYNC_COMMITTEE_PREFIX_TOPIC) {
return Some(Subnet::SyncCommittee(SyncSubnetId::new( return Some(GossipKind::SyncCommitteeMessage(SyncSubnetId::new(
topic index.parse::<u64>().ok()?,
.trim_start_matches(SYNC_COMMITTEE_PREFIX_TOPIC)
.parse::<u64>()
.ok()?,
))); )));
} else if let Some(index) = topic.strip_prefix(BLOB_SIDECAR_PREFIX) {
return Some(GossipKind::BlobSidecar(index.parse::<u64>().ok()?));
} }
None None
} }

View File

@ -101,6 +101,7 @@ pub mod sqlite;
pub mod blob_sidecar; pub mod blob_sidecar;
pub mod blobs_sidecar; pub mod blobs_sidecar;
pub mod signed_blob;
pub mod signed_block_and_blobs; pub mod signed_block_and_blobs;
pub mod transaction; pub mod transaction;
@ -181,6 +182,7 @@ pub use crate::signed_beacon_block::{
SignedBlindedBeaconBlock, SignedBlindedBeaconBlock,
}; };
pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader; pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader;
pub use crate::signed_blob::*;
pub use crate::signed_block_and_blobs::SignedBeaconBlockAndBlobsSidecar; pub use crate::signed_block_and_blobs::SignedBeaconBlockAndBlobsSidecar;
pub use crate::signed_block_and_blobs::SignedBeaconBlockAndBlobsSidecarDecode; pub use crate::signed_block_and_blobs::SignedBeaconBlockAndBlobsSidecarDecode;
pub use crate::signed_bls_to_execution_change::SignedBlsToExecutionChange; pub use crate::signed_bls_to_execution_change::SignedBlsToExecutionChange;

View File

@ -0,0 +1,24 @@
use crate::{test_utils::TestRandom, BlobSidecar, EthSpec, Signature};
use serde_derive::{Deserialize, Serialize};
use ssz_derive::{Decode, Encode};
use test_random_derive::TestRandom;
use tree_hash_derive::TreeHash;
#[derive(
Debug,
Clone,
PartialEq,
Serialize,
Deserialize,
Encode,
Decode,
TestRandom,
TreeHash,
arbitrary::Arbitrary,
)]
#[serde(bound = "T: EthSpec")]
#[arbitrary(bound = "T: EthSpec")]
pub struct SignedBlobSidecar<T: EthSpec> {
pub blob: BlobSidecar<T>,
pub signature: Signature,
}