diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 283dcf96c..08a88f5db 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -352,7 +352,7 @@ pub struct BeaconChain { /// in recent epochs. pub(crate) observed_sync_aggregators: RwLock>, /// Maintains a record of which validators have proposed blocks for each slot. - pub(crate) observed_block_producers: RwLock>, + pub observed_block_producers: RwLock>, /// Maintains a record of which validators have submitted voluntary exits. pub(crate) observed_voluntary_exits: Mutex>, /// Maintains a record of which validators we've seen proposer slashings for. diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 9f04d9972..427be6d51 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -56,7 +56,7 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); /// For how long to queue rpc blocks before sending them back for reprocessing. -pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3); +pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4); /// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that /// we signature-verify blocks before putting them in the queue *should* protect against this, but @@ -521,7 +521,7 @@ impl ReprocessQueue { return; } - // Queue the block for 1/4th of a slot + // Queue the block for 1/3rd of a slot self.rpc_block_delay_queue .insert(rpc_block, QUEUED_RPC_BLOCK_DELAY); } diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index e8182a1d5..61ecc30d4 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -83,6 +83,52 @@ impl Worker { return; } }; + // Check if a block from this proposer is already known. If so, defer processing until later + // to avoid wasting time processing duplicates. + let proposal_already_known = self + .chain + .observed_block_producers + .read() + .proposer_has_been_observed(block.message()) + .map_err(|e| { + error!( + self.log, + "Failed to check observed proposers"; + "error" => ?e, + "source" => "rpc", + "block_root" => %block_root + ); + }) + .unwrap_or(true); + if proposal_already_known { + debug!( + self.log, + "Delaying processing of duplicate RPC block"; + "block_root" => ?block_root, + "proposer" => block.message().proposer_index(), + "slot" => block.slot() + ); + + // Send message to work reprocess queue to retry the block + let reprocess_msg = ReprocessQueueMessage::RpcBlock(QueuedRpcBlock { + block_root, + block: block.clone(), + process_type, + seen_timestamp, + should_process: true, + }); + + if reprocess_tx.try_send(reprocess_msg).is_err() { + error!( + self.log, + "Failed to inform block import"; + "source" => "rpc", + "block_root" => %block_root + ); + } + return; + } + let slot = block.slot(); let parent_root = block.message().parent_root(); let result = self