Reprocess blob sidecar messages

This commit is contained in:
Pawan Dhananjay 2022-10-05 02:52:23 -05:00
parent c55b28bf10
commit 21bf3d37cd
No known key found for this signature in database
GPG Key ID: 647E56278D7E9B4C
4 changed files with 288 additions and 19 deletions

View File

@ -62,6 +62,14 @@ pub enum BlobError {
/// be equal to the given sidecar. /// be equal to the given sidecar.
RepeatSidecar { proposer: u64, slot: Slot }, RepeatSidecar { proposer: u64, slot: Slot },
/// The `blobs_sidecar.message.beacon_block_root` block is unknown.
///
/// ## Peer scoring
///
/// The attestation points to a block we have not yet imported. It's unclear if the attestation
/// is valid or not.
UnknownHeadBlock { beacon_block_root: Hash256 },
/// There was an error whilst processing the sync contribution. It is not known if it is valid or invalid. /// There was an error whilst processing the sync contribution. It is not known if it is valid or invalid.
/// ///
/// ## Peer scoring /// ## Peer scoring

View File

@ -80,6 +80,8 @@ mod worker;
use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock; use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock;
pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage}; pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage};
use self::work_reprocessing_queue::QueuedBlobsSidecar;
/// The maximum size of the channel for work events to the `BeaconProcessor`. /// The maximum size of the channel for work events to the `BeaconProcessor`.
/// ///
/// Setting this too low will cause consensus messages to be dropped. /// Setting this too low will cause consensus messages to be dropped.
@ -116,6 +118,8 @@ const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;
//FIXME(sean) verify //FIXME(sean) verify
const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024; const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024;
//FIXME(sean) verify
const MAX_BLOBS_SIDECAR_REPROCESS_QUEUE_LEN: usize = 1_024;
/// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but /// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but
/// within acceptable clock disparity) that will be queued before we start dropping them. /// within acceptable clock disparity) that will be queued before we start dropping them.
@ -206,6 +210,7 @@ pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request"; pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request";
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const UNKNOWN_BLOBS_SIDECAR: &str = "unknown_blobs_sidecar";
/// A simple first-in-first-out queue with a maximum length. /// A simple first-in-first-out queue with a maximum length.
struct FifoQueue<T> { struct FifoQueue<T> {
@ -413,7 +418,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
pub fn gossip_blobs_sidecar( pub fn gossip_blobs_sidecar(
message_id: MessageId, message_id: MessageId,
peer_id: PeerId, peer_id: PeerId,
peer_client: Client, _peer_client: Client,
blobs: Arc<SignedBlobsSidecar<T::EthSpec>>, blobs: Arc<SignedBlobsSidecar<T::EthSpec>>,
seen_timestamp: Duration, seen_timestamp: Duration,
) -> Self { ) -> Self {
@ -422,7 +427,6 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
work: Work::GossipBlobsSidecar { work: Work::GossipBlobsSidecar {
message_id, message_id,
peer_id, peer_id,
peer_client,
blobs, blobs,
seen_timestamp, seen_timestamp,
}, },
@ -670,6 +674,20 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
seen_timestamp, seen_timestamp,
}, },
}, },
ReadyWork::BlobsSidecar(QueuedBlobsSidecar {
peer_id,
message_id,
blobs_sidecar,
seen_timestamp,
}) => Self {
drop_during_sync: true,
work: Work::UnknownBlobsSidecar {
message_id,
peer_id,
blobs: blobs_sidecar,
seen_timestamp,
},
},
} }
} }
} }
@ -722,7 +740,12 @@ pub enum Work<T: BeaconChainTypes> {
GossipBlobsSidecar { GossipBlobsSidecar {
message_id: MessageId, message_id: MessageId,
peer_id: PeerId, peer_id: PeerId,
peer_client: Client, blobs: Arc<SignedBlobsSidecar<T::EthSpec>>,
seen_timestamp: Duration,
},
UnknownBlobsSidecar {
message_id: MessageId,
peer_id: PeerId,
blobs: Arc<SignedBlobsSidecar<T::EthSpec>>, blobs: Arc<SignedBlobsSidecar<T::EthSpec>>,
seen_timestamp: Duration, seen_timestamp: Duration,
}, },
@ -815,6 +838,7 @@ impl<T: BeaconChainTypes> Work<T> {
Work::BlobsByRangeRequest { .. } => BLOBS_BY_RANGE_REQUEST, Work::BlobsByRangeRequest { .. } => BLOBS_BY_RANGE_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::UnknownBlobsSidecar { .. } => UNKNOWN_BLOBS_SIDECAR,
} }
} }
} }
@ -931,6 +955,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
LifoQueue::new(MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN); LifoQueue::new(MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN);
let mut unknown_block_attestation_queue = let mut unknown_block_attestation_queue =
LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN); LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN);
let mut unknown_blobs_sidecar_queue = LifoQueue::new(MAX_BLOBS_SIDECAR_REPROCESS_QUEUE_LEN);
let mut sync_message_queue = LifoQueue::new(MAX_SYNC_MESSAGE_QUEUE_LEN); let mut sync_message_queue = LifoQueue::new(MAX_SYNC_MESSAGE_QUEUE_LEN);
let mut sync_contribution_queue = LifoQueue::new(MAX_SYNC_CONTRIBUTION_QUEUE_LEN); let mut sync_contribution_queue = LifoQueue::new(MAX_SYNC_CONTRIBUTION_QUEUE_LEN);
@ -1312,6 +1337,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::UnknownBlockAggregate { .. } => { Work::UnknownBlockAggregate { .. } => {
unknown_block_aggregate_queue.push(work) unknown_block_aggregate_queue.push(work)
} }
Work::UnknownBlobsSidecar { .. } => {
unknown_blobs_sidecar_queue.push(work)
}
} }
} }
} }
@ -1531,20 +1559,16 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::GossipBlobsSidecar { Work::GossipBlobsSidecar {
message_id, message_id,
peer_id, peer_id,
peer_client,
blobs, blobs,
seen_timestamp, seen_timestamp,
} => task_spawner.spawn_async(async move { } => task_spawner.spawn_async(async move {
worker worker.process_gossip_blob(
.process_gossip_blob(
message_id, message_id,
peer_id, peer_id,
peer_client,
blobs, blobs,
work_reprocessing_tx, Some(work_reprocessing_tx),
seen_timestamp, seen_timestamp,
) )
.await
}), }),
/* /*
* Import for blocks that we received earlier than their intended slot. * Import for blocks that we received earlier than their intended slot.
@ -1731,6 +1755,14 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
seen_timestamp, seen_timestamp,
) )
}), }),
Work::UnknownBlobsSidecar {
message_id,
peer_id,
blobs,
seen_timestamp,
} => task_spawner.spawn_blocking(move || {
worker.process_gossip_blob(message_id, peer_id, blobs, None, seen_timestamp)
}),
}; };
} }
} }

View File

@ -30,7 +30,10 @@ use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time::error::Error as TimeError; use tokio::time::error::Error as TimeError;
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId}; use types::{
Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobsSidecar,
SubnetId,
};
const TASK_NAME: &str = "beacon_processor_reprocess_queue"; const TASK_NAME: &str = "beacon_processor_reprocess_queue";
const GOSSIP_BLOCKS: &str = "gossip_blocks"; const GOSSIP_BLOCKS: &str = "gossip_blocks";
@ -44,6 +47,10 @@ const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5);
/// For how long to queue aggregated and unaggregated attestations for re-processing. /// For how long to queue aggregated and unaggregated attestations for re-processing.
pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue blob sidecars for re-processing.
/// TODO: rethink duration
pub const QUEUED_BLOBS_SIDECARS_DELAY: Duration = Duration::from_secs(6);
/// For how long to queue rpc blocks before sending them back for reprocessing. /// For how long to queue rpc blocks before sending them back for reprocessing.
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3); pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3);
@ -55,6 +62,10 @@ const MAXIMUM_QUEUED_BLOCKS: usize = 16;
/// How many attestations we keep before new ones get dropped. /// How many attestations we keep before new ones get dropped.
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
/// TODO: fix number
/// How many blobs we keep before new ones get dropped.
const MAXIMUM_QUEUED_BLOB_SIDECARS: usize = 16_384;
/// Messages that the scheduler can receive. /// Messages that the scheduler can receive.
pub enum ReprocessQueueMessage<T: BeaconChainTypes> { pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
/// A block that has been received early and we should queue for later processing. /// A block that has been received early and we should queue for later processing.
@ -69,6 +80,8 @@ pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
UnknownBlockUnaggregate(QueuedUnaggregate<T::EthSpec>), UnknownBlockUnaggregate(QueuedUnaggregate<T::EthSpec>),
/// An aggregated attestation that references an unknown block. /// An aggregated attestation that references an unknown block.
UnknownBlockAggregate(QueuedAggregate<T::EthSpec>), UnknownBlockAggregate(QueuedAggregate<T::EthSpec>),
/// A blob sidecar that references an unknown block.
UnknownBlobSidecar(QueuedBlobsSidecar<T::EthSpec>),
} }
/// Events sent by the scheduler once they are ready for re-processing. /// Events sent by the scheduler once they are ready for re-processing.
@ -77,6 +90,7 @@ pub enum ReadyWork<T: BeaconChainTypes> {
RpcBlock(QueuedRpcBlock<T::EthSpec>), RpcBlock(QueuedRpcBlock<T::EthSpec>),
Unaggregate(QueuedUnaggregate<T::EthSpec>), Unaggregate(QueuedUnaggregate<T::EthSpec>),
Aggregate(QueuedAggregate<T::EthSpec>), Aggregate(QueuedAggregate<T::EthSpec>),
BlobsSidecar(QueuedBlobsSidecar<T::EthSpec>),
} }
/// An Attestation for which the corresponding block was not seen while processing, queued for /// An Attestation for which the corresponding block was not seen while processing, queued for
@ -118,6 +132,15 @@ pub struct QueuedRpcBlock<T: EthSpec> {
pub should_process: bool, pub should_process: bool,
} }
/// A blob sidecar for which the corresponding block was not seen while processing, queued for
/// later.
pub struct QueuedBlobsSidecar<T: EthSpec> {
pub peer_id: PeerId,
pub message_id: MessageId,
pub blobs_sidecar: Arc<SignedBlobsSidecar<T>>,
pub seen_timestamp: Duration,
}
/// Unifies the different messages processed by the block delay queue. /// Unifies the different messages processed by the block delay queue.
enum InboundEvent<T: BeaconChainTypes> { enum InboundEvent<T: BeaconChainTypes> {
/// A gossip block that was queued for later processing and is ready for import. /// A gossip block that was queued for later processing and is ready for import.
@ -127,6 +150,8 @@ enum InboundEvent<T: BeaconChainTypes> {
ReadyRpcBlock(QueuedRpcBlock<T::EthSpec>), ReadyRpcBlock(QueuedRpcBlock<T::EthSpec>),
/// An aggregated or unaggregated attestation is ready for re-processing. /// An aggregated or unaggregated attestation is ready for re-processing.
ReadyAttestation(QueuedAttestationId), ReadyAttestation(QueuedAttestationId),
/// A blob sidecar is ready for re-processing.
ReadyBlobsSidecar(QueuedBlobsSidecarId),
/// A `DelayQueue` returned an error. /// A `DelayQueue` returned an error.
DelayQueueError(TimeError, &'static str), DelayQueueError(TimeError, &'static str),
/// A message sent to the `ReprocessQueue` /// A message sent to the `ReprocessQueue`
@ -147,6 +172,7 @@ struct ReprocessQueue<T: BeaconChainTypes> {
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock<T::EthSpec>>, rpc_block_delay_queue: DelayQueue<QueuedRpcBlock<T::EthSpec>>,
/// Queue to manage scheduled attestations. /// Queue to manage scheduled attestations.
attestations_delay_queue: DelayQueue<QueuedAttestationId>, attestations_delay_queue: DelayQueue<QueuedAttestationId>,
blobs_sidecar_delay_queue: DelayQueue<QueuedBlobsSidecarId>,
/* Queued items */ /* Queued items */
/// Queued blocks. /// Queued blocks.
@ -155,15 +181,19 @@ struct ReprocessQueue<T: BeaconChainTypes> {
queued_aggregates: FnvHashMap<usize, (QueuedAggregate<T::EthSpec>, DelayKey)>, queued_aggregates: FnvHashMap<usize, (QueuedAggregate<T::EthSpec>, DelayKey)>,
/// Queued attestations. /// Queued attestations.
queued_unaggregates: FnvHashMap<usize, (QueuedUnaggregate<T::EthSpec>, DelayKey)>, queued_unaggregates: FnvHashMap<usize, (QueuedUnaggregate<T::EthSpec>, DelayKey)>,
queued_blob_sidecars: FnvHashMap<usize, (QueuedBlobsSidecar<T::EthSpec>, DelayKey)>,
/// Attestations (aggregated and unaggregated) per root. /// Attestations (aggregated and unaggregated) per root.
awaiting_attestations_per_root: HashMap<Hash256, Vec<QueuedAttestationId>>, awaiting_attestations_per_root: HashMap<Hash256, Vec<QueuedAttestationId>>,
awaiting_blobs_sidecars_per_root: HashMap<Hash256, Vec<QueuedBlobsSidecarId>>,
/* Aux */ /* Aux */
/// Next attestation id, used for both aggregated and unaggregated attestations /// Next attestation id, used for both aggregated and unaggregated attestations
next_attestation: usize, next_attestation: usize,
next_sidecar: usize,
early_block_debounce: TimeLatch, early_block_debounce: TimeLatch,
rpc_block_debounce: TimeLatch, rpc_block_debounce: TimeLatch,
attestation_delay_debounce: TimeLatch, attestation_delay_debounce: TimeLatch,
blobs_sidecar_debounce: TimeLatch,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -172,6 +202,9 @@ enum QueuedAttestationId {
Unaggregate(usize), Unaggregate(usize),
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct QueuedBlobsSidecarId(usize);
impl<T: EthSpec> QueuedAggregate<T> { impl<T: EthSpec> QueuedAggregate<T> {
pub fn beacon_block_root(&self) -> &Hash256 { pub fn beacon_block_root(&self) -> &Hash256 {
&self.attestation.message.aggregate.data.beacon_block_root &self.attestation.message.aggregate.data.beacon_block_root
@ -235,6 +268,21 @@ impl<T: BeaconChainTypes> Stream for ReprocessQueue<T> {
Poll::Ready(None) | Poll::Pending => (), Poll::Ready(None) | Poll::Pending => (),
} }
match self.blobs_sidecar_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(id))) => {
return Poll::Ready(Some(InboundEvent::ReadyBlobsSidecar(id.into_inner())));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(
e,
"blobs_sidecar_queue",
)));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
// Last empty the messages channel. // Last empty the messages channel.
match self.work_reprocessing_rx.poll_recv(cx) { match self.work_reprocessing_rx.poll_recv(cx) {
Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))), Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))),
@ -264,14 +312,19 @@ pub fn spawn_reprocess_scheduler<T: BeaconChainTypes>(
gossip_block_delay_queue: DelayQueue::new(), gossip_block_delay_queue: DelayQueue::new(),
rpc_block_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(),
blobs_sidecar_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(), queued_gossip_block_roots: HashSet::new(),
queued_aggregates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(),
queued_blob_sidecars: FnvHashMap::default(),
awaiting_attestations_per_root: HashMap::new(), awaiting_attestations_per_root: HashMap::new(),
awaiting_blobs_sidecars_per_root: HashMap::new(),
next_attestation: 0, next_attestation: 0,
next_sidecar: 0,
early_block_debounce: TimeLatch::default(), early_block_debounce: TimeLatch::default(),
rpc_block_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(),
attestation_delay_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(),
blobs_sidecar_debounce: TimeLatch::default(),
}; };
executor.spawn( executor.spawn(
@ -473,6 +526,39 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
self.next_attestation += 1; self.next_attestation += 1;
} }
InboundEvent::Msg(UnknownBlobSidecar(queued_blob_sidecar)) => {
if self.blobs_sidecar_delay_queue.len() >= MAXIMUM_QUEUED_BLOB_SIDECARS {
if self.blobs_sidecar_debounce.elapsed() {
error!(
log,
"Blobs sidecar queue is full";
"queue_size" => MAXIMUM_QUEUED_BLOB_SIDECARS,
"msg" => "check system clock"
);
}
// Drop the attestation.
return;
}
let id = QueuedBlobsSidecarId(self.next_sidecar);
// Register the delay.
let delay_key = self
.blobs_sidecar_delay_queue
.insert(id, QUEUED_BLOBS_SIDECARS_DELAY);
// Register this sidecar for the corresponding root.
self.awaiting_blobs_sidecars_per_root
.entry(queued_blob_sidecar.blobs_sidecar.message.beacon_block_root)
.or_default()
.push(id);
// Store the blob sidecar and its info.
self.queued_blob_sidecars
.insert(self.next_sidecar, (queued_blob_sidecar, delay_key));
self.next_sidecar += 1;
}
InboundEvent::Msg(BlockImported(root)) => { InboundEvent::Msg(BlockImported(root)) => {
// Unqueue the attestations we have for this root, if any. // Unqueue the attestations we have for this root, if any.
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&root) { if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&root) {
@ -517,6 +603,43 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
} }
} }
} }
// Unqueue the blob sidecars we have for this root, if any.
// TODO: merge the 2 data structures.
if let Some(queued_ids) = self.awaiting_blobs_sidecars_per_root.remove(&root) {
for id in queued_ids {
// metrics::inc_counter(
// &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS,
// );
if let Some((work, delay_key)) = self
.queued_blob_sidecars
.remove(&id.0)
.map(|(blobs_sidecar, delay_key)| {
(ReadyWork::BlobsSidecar(blobs_sidecar), delay_key)
})
{
// Remove the delay.
self.blobs_sidecar_delay_queue.remove(&delay_key);
// Send the work.
if self.ready_work_tx.try_send(work).is_err() {
error!(
log,
"Failed to send scheduled blob sidecar";
);
}
} else {
// There is a mismatch between the blob sidecar ids registered for this
// root and the queued blob sidecars. This should never happen.
error!(
log,
"Unknown queued blob sidecar for block root";
"block_root" => ?root,
"id" => ?id,
);
}
}
}
} }
// A block that was queued for later processing is now ready to be processed. // A block that was queued for later processing is now ready to be processed.
InboundEvent::ReadyGossipBlock(ready_block) => { InboundEvent::ReadyGossipBlock(ready_block) => {
@ -591,6 +714,40 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
} }
} }
} }
InboundEvent::ReadyBlobsSidecar(queued_blobs_sidecar_id) => {
// metrics::inc_counter(
// &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,
// );
if let Some((root, work)) = self
.queued_blob_sidecars
.remove(&queued_blobs_sidecar_id.0)
.map(|(blobs_sidecar, _delay_key)| {
(
blobs_sidecar.blobs_sidecar.message.beacon_block_root,
ReadyWork::BlobsSidecar(blobs_sidecar),
)
})
{
if self.ready_work_tx.try_send(work).is_err() {
error!(
log,
"Failed to send scheduled attestation";
);
}
if let Some(queued_blob_sidecars) =
self.awaiting_blobs_sidecars_per_root.get_mut(&root)
{
if let Some(index) = queued_blob_sidecars
.iter()
.position(|&id| id == queued_blobs_sidecar_id)
{
queued_blob_sidecars.swap_remove(index);
}
}
}
}
} }
metrics::set_gauge_vec( metrics::set_gauge_vec(

View File

@ -1,3 +1,4 @@
use crate::beacon_processor::work_reprocessing_queue::QueuedBlobsSidecar;
use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::store::Error; use beacon_chain::store::Error;
@ -696,13 +697,12 @@ impl<T: BeaconChainTypes> Worker<T> {
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn process_gossip_blob( pub fn process_gossip_blob(
self, self,
message_id: MessageId, message_id: MessageId,
peer_id: PeerId, peer_id: PeerId,
peer_client: Client,
blob: Arc<SignedBlobsSidecar<T::EthSpec>>, blob: Arc<SignedBlobsSidecar<T::EthSpec>>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>, reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
seen_timestamp: Duration, seen_timestamp: Duration,
) { ) {
match self.chain.verify_blobs_sidecar_for_gossip(&blob) { match self.chain.verify_blobs_sidecar_for_gossip(&blob) {
@ -714,8 +714,9 @@ impl<T: BeaconChainTypes> Worker<T> {
Err(error) => self.handle_blobs_verification_failure( Err(error) => self.handle_blobs_verification_failure(
peer_id, peer_id,
message_id, message_id,
Some(reprocess_tx), reprocess_tx,
error, error,
blob,
seen_timestamp, seen_timestamp,
), ),
}; };
@ -2233,7 +2234,78 @@ impl<T: BeaconChainTypes> Worker<T> {
message_id: MessageId, message_id: MessageId,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>, reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
error: BlobError, error: BlobError,
blobs_sidecar: Arc<SignedBlobsSidecar<T::EthSpec>>,
seen_timestamp: Duration, seen_timestamp: Duration,
) { ) {
// TODO: metrics
match &error {
BlobError::FutureSlot { .. } => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
BlobError::PastSlot { .. } => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
BlobError::BeaconChainError(e) => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
BlobError::BlobOutOfRange { blob_index } => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
}
BlobError::InvalidKZGCommitment => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
}
BlobError::ProposalSignatureInvalid => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
}
BlobError::RepeatSidecar { proposer, slot } => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
BlobError::UnknownHeadBlock { beacon_block_root } => {
debug!(
self.log,
"Blob sidecar for unknown block";
"peer_id" => %peer_id,
"block" => ?beacon_block_root
);
if let Some(sender) = reprocess_tx {
// We don't know the block, get the sync manager to handle the block lookup, and
// send the attestation to be scheduled for re-processing.
self.sync_tx
.send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root))
.unwrap_or_else(|_| {
warn!(
self.log,
"Failed to send to sync service";
"msg" => "UnknownBlockHash"
)
});
let msg = ReprocessQueueMessage::UnknownBlobSidecar(QueuedBlobsSidecar {
peer_id,
message_id,
blobs_sidecar,
seen_timestamp,
});
if sender.try_send(msg).is_err() {
error!(
self.log,
"Failed to send blob sidecar for re-processing";
)
}
} else {
// We shouldn't make any further attempts to process this attestation.
//
// Don't downscore the peer since it's not clear if we requested this head
// block from them or not.
self.propagate_validation_result(
message_id,
peer_id,
MessageAcceptance::Ignore,
);
}
return;
}
}
} }
} }