Prevent double import of blocks (#2647)

## Issue Addressed

Resolves #2611 

## Proposed Changes

Adds a duplicate block root cache to the `BeaconProcessor`. Adds the block root to the cache before calling `process_gossip_block` and `process_rpc_block`. Since `process_rpc_block` is called only for single block lookups, we don't have to worry about batched block imports.

The block is imported from the source(gossip/rpc) that arrives first. The block that arrives second is not imported to avoid the db access issue.
There are 2 cases:
1. Block that arrives second is from rpc: In this case, we return an optimistic `BlockError::BlockIsAlreadyKnown` to sync.
2. Block that arrives second is from gossip: In this case, we only do gossip verification and forwarding but don't import the block into the the beacon chain.

## Additional info
Splits up `process_gossip_block` function to `process_gossip_unverified_block` and `process_gossip_verified_block`.
This commit is contained in:
Pawan Dhananjay 2021-10-28 03:36:14 +00:00
parent 2dc6163043
commit 88063398f6
5 changed files with 164 additions and 36 deletions

View File

@ -39,6 +39,7 @@
//! task. //! task.
use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::parking_lot::Mutex;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, GossipVerifiedBlock}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, GossipVerifiedBlock};
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use futures::task::Poll; use futures::task::Poll;
@ -47,13 +48,13 @@ use lighthouse_network::{
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
}; };
use slog::{crit, debug, error, trace, warn, Logger}; use slog::{crit, debug, error, trace, warn, Logger};
use std::cmp;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt; use std::fmt;
use std::pin::Pin; use std::pin::Pin;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::task::Context; use std::task::Context;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{cmp, collections::HashSet};
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use types::{ use types::{
@ -284,6 +285,55 @@ impl<T> LifoQueue<T> {
} }
} }
/// A handle that sends a message on the provided channel to a receiver when it gets dropped.
///
/// The receiver task is responsible for removing the provided `entry` from the `DuplicateCache`
/// and perform any other necessary cleanup.
pub struct DuplicateCacheHandle {
entry: Hash256,
cache: DuplicateCache,
}
impl Drop for DuplicateCacheHandle {
fn drop(&mut self) {
self.cache.remove(&self.entry);
}
}
/// A simple cache for detecting duplicate block roots across multiple threads.
#[derive(Clone, Default)]
pub struct DuplicateCache {
inner: Arc<Mutex<HashSet<Hash256>>>,
}
impl DuplicateCache {
/// Checks if the given block_root exists and inserts it into the cache if
/// it doesn't exist.
///
/// Returns a `Some(DuplicateCacheHandle)` if the block_root was successfully
/// inserted and `None` if the block root already existed in the cache.
///
/// The handle removes the entry from the cache when it is dropped. This ensures that any unclean
/// shutdowns in the worker tasks does not leave inconsistent state in the cache.
pub fn check_and_insert(&self, block_root: Hash256) -> Option<DuplicateCacheHandle> {
let mut inner = self.inner.lock();
if inner.insert(block_root) {
Some(DuplicateCacheHandle {
entry: block_root,
cache: self.clone(),
})
} else {
None
}
}
/// Remove the given block_root from the cache.
pub fn remove(&self, block_root: &Hash256) {
let mut inner = self.inner.lock();
inner.remove(block_root);
}
}
/// An event to be processed by the manager task. /// An event to be processed by the manager task.
pub struct WorkEvent<T: BeaconChainTypes> { pub struct WorkEvent<T: BeaconChainTypes> {
drop_during_sync: bool, drop_during_sync: bool,
@ -787,6 +837,7 @@ pub struct BeaconProcessor<T: BeaconChainTypes> {
pub executor: TaskExecutor, pub executor: TaskExecutor,
pub max_workers: usize, pub max_workers: usize,
pub current_workers: usize, pub current_workers: usize,
pub importing_blocks: DuplicateCache,
pub log: Logger, pub log: Logger,
} }
@ -1302,6 +1353,8 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
log: self.log.clone(), log: self.log.clone(),
}; };
let duplicate_cache = self.importing_blocks.clone();
trace!( trace!(
self.log, self.log,
"Spawning beacon processor worker"; "Spawning beacon processor worker";
@ -1373,7 +1426,8 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
peer_id, peer_id,
peer_client, peer_client,
*block, *block,
work_reprocessing_tx, work_reprocessing_tx.clone(),
duplicate_cache,
seen_timestamp, seen_timestamp,
), ),
/* /*
@ -1455,7 +1509,12 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
* Verification for beacon blocks received during syncing via RPC. * Verification for beacon blocks received during syncing via RPC.
*/ */
Work::RpcBlock { block, result_tx } => { Work::RpcBlock { block, result_tx } => {
worker.process_rpc_block(*block, result_tx, work_reprocessing_tx) worker.process_rpc_block(
*block,
result_tx,
work_reprocessing_tx.clone(),
duplicate_cache,
);
} }
/* /*
* Verification for a chain segment (multiple blocks). * Verification for a chain segment (multiple blocks).

View File

@ -201,6 +201,7 @@ impl TestRig {
executor, executor,
max_workers: cmp::max(1, num_cpus::get()), max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0, current_workers: 0,
importing_blocks: Default::default(),
log: log.clone(), log: log.clone(),
} }
.spawn_manager(beacon_processor_rx, Some(work_journal_tx)); .spawn_manager(beacon_processor_rx, Some(work_journal_tx));

View File

@ -25,6 +25,7 @@ use super::{
}, },
Worker, Worker,
}; };
use crate::beacon_processor::DuplicateCache;
/// An attestation that has been validated by the `BeaconChain`. /// An attestation that has been validated by the `BeaconChain`.
/// ///
@ -190,7 +191,7 @@ impl<T: BeaconChainTypes> Worker<T> {
/// Creates a log if there is an internal error. /// Creates a log if there is an internal error.
/// Propagates the result of the validation for the given message to the network. If the result /// Propagates the result of the validation for the given message to the network. If the result
/// is valid the message gets forwarded to other peers. /// is valid the message gets forwarded to other peers.
fn propagate_validation_result( pub(crate) fn propagate_validation_result(
&self, &self,
message_id: MessageId, message_id: MessageId,
propagation_source: PeerId, propagation_source: PeerId,
@ -618,6 +619,7 @@ impl<T: BeaconChainTypes> Worker<T> {
/// be downloaded. /// be downloaded.
/// ///
/// Raises a log if there are errors. /// Raises a log if there are errors.
#[allow(clippy::too_many_arguments)]
pub fn process_gossip_block( pub fn process_gossip_block(
self, self,
message_id: MessageId, message_id: MessageId,
@ -625,8 +627,50 @@ impl<T: BeaconChainTypes> Worker<T> {
peer_client: Client, peer_client: Client,
block: SignedBeaconBlock<T::EthSpec>, block: SignedBeaconBlock<T::EthSpec>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>, reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
duplicate_cache: DuplicateCache,
seen_duration: Duration, seen_duration: Duration,
) { ) {
if let Some(gossip_verified_block) = self.process_gossip_unverified_block(
message_id,
peer_id,
peer_client,
block,
reprocess_tx.clone(),
seen_duration,
) {
let block_root = gossip_verified_block.block_root;
if let Some(handle) = duplicate_cache.check_and_insert(block_root) {
self.process_gossip_verified_block(
peer_id,
gossip_verified_block,
reprocess_tx,
seen_duration,
);
// Drop the handle to remove the entry from the cache
drop(handle);
} else {
debug!(
self.log,
"RPC block is being imported";
"block_root" => %block_root,
);
}
}
}
/// Process the beacon block received from the gossip network and
/// if it passes gossip propagation criteria, tell the network thread to forward it.
///
/// Returns the `GossipVerifiedBlock` if verification passes and raises a log if there are errors.
pub fn process_gossip_unverified_block(
&self,
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
block: SignedBeaconBlock<T::EthSpec>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
seen_duration: Duration,
) -> Option<GossipVerifiedBlock<T>> {
let block_delay = let block_delay =
get_block_delay_ms(seen_duration, block.message(), &self.chain.slot_clock); get_block_delay_ms(seen_duration, block.message(), &self.chain.slot_clock);
// Log metrics to track delay from other nodes on the network. // Log metrics to track delay from other nodes on the network.
@ -687,7 +731,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"root" => ?block.canonical_root() "root" => ?block.canonical_root()
); );
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block)); self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
return; return None;
} }
Err(e @ BlockError::FutureSlot { .. }) Err(e @ BlockError::FutureSlot { .. })
| Err(e @ BlockError::WouldRevertFinalizedSlot { .. }) | Err(e @ BlockError::WouldRevertFinalizedSlot { .. })
@ -700,7 +744,7 @@ impl<T: BeaconChainTypes> Worker<T> {
// Prevent recurring behaviour by penalizing the peer slightly. // Prevent recurring behaviour by penalizing the peer slightly.
self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError); self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return; return None;
} }
Err(e @ BlockError::StateRootMismatch { .. }) Err(e @ BlockError::StateRootMismatch { .. })
| Err(e @ BlockError::IncorrectBlockProposer { .. }) | Err(e @ BlockError::IncorrectBlockProposer { .. })
@ -720,7 +764,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"error" => %e); "error" => %e);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
return; return None;
} }
}; };
@ -786,13 +830,9 @@ impl<T: BeaconChainTypes> Worker<T> {
"location" => "block gossip" "location" => "block gossip"
) )
} }
None
} }
Ok(_) => self.process_gossip_verified_block( Ok(_) => Some(verified_block),
peer_id,
verified_block,
reprocess_tx,
seen_duration,
),
Err(e) => { Err(e) => {
error!( error!(
self.log, self.log,
@ -801,7 +841,8 @@ impl<T: BeaconChainTypes> Worker<T> {
"block_slot" => %block_slot, "block_slot" => %block_slot,
"block_root" => ?block_root, "block_root" => ?block_root,
"location" => "block gossip" "location" => "block gossip"
) );
None
} }
} }
} }

View File

@ -1,6 +1,6 @@
use super::{super::work_reprocessing_queue::ReprocessQueueMessage, Worker}; use super::{super::work_reprocessing_queue::ReprocessQueueMessage, Worker};
use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE; use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE;
use crate::beacon_processor::BlockResultSender; use crate::beacon_processor::{BlockResultSender, DuplicateCache};
use crate::metrics; use crate::metrics;
use crate::sync::manager::{SyncMessage, SyncRequestType}; use crate::sync::manager::{SyncMessage, SyncRequestType};
use crate::sync::{BatchProcessResult, ChainId}; use crate::sync::{BatchProcessResult, ChainId};
@ -33,7 +33,11 @@ impl<T: BeaconChainTypes> Worker<T> {
block: SignedBeaconBlock<T::EthSpec>, block: SignedBeaconBlock<T::EthSpec>,
result_tx: BlockResultSender<T::EthSpec>, result_tx: BlockResultSender<T::EthSpec>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>, reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
duplicate_cache: DuplicateCache,
) { ) {
let block_root = block.canonical_root();
// Checks if the block is already being imported through another source
if let Some(handle) = duplicate_cache.check_and_insert(block_root) {
let slot = block.slot(); let slot = block.slot();
let block_result = self.chain.process_block(block); let block_result = self.chain.process_block(block);
@ -63,6 +67,28 @@ impl<T: BeaconChainTypes> Worker<T> {
if result_tx.send(block_result).is_err() { if result_tx.send(block_result).is_err() {
crit!(self.log, "Failed return sync block result"); crit!(self.log, "Failed return sync block result");
} }
// Drop the handle to remove the entry from the cache
drop(handle);
} else {
debug!(
self.log,
"Gossip block is being imported";
"block_root" => %block_root,
);
// The gossip block that is being imported should eventually
// trigger reprocessing of queued attestations once it is imported.
// If the gossip block fails import, then it will be downscored
// appropriately in `process_gossip_block`.
// Here, we assume that the block will eventually be imported and
// send a `BlockIsAlreadyKnown` message to sync.
if result_tx
.send(Err(BlockError::BlockIsAlreadyKnown))
.is_err()
{
crit!(self.log, "Failed return sync block result");
}
}
} }
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync

View File

@ -65,6 +65,7 @@ impl<T: BeaconChainTypes> Processor<T> {
executor, executor,
max_workers: cmp::max(1, num_cpus::get()), max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0, current_workers: 0,
importing_blocks: Default::default(),
log: log.clone(), log: log.clone(),
} }
.spawn_manager(beacon_processor_receive, None); .spawn_manager(beacon_processor_receive, None);