diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 5d88c5ca4..60fb9e419 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -376,7 +376,7 @@ pub struct BeaconChain { /// continue they can request that everything shuts down. pub shutdown_sender: Sender, pub block_waiting_for_sidecar: Mutex>>, - pub sidecar_waiting_for_block: Mutex>>, + pub sidecar_waiting_for_block: Mutex>>>, /// Logging to CLI, etc. pub(crate) log: Logger, /// Arbitrary bytes included in the blocks. @@ -2431,7 +2431,7 @@ impl BeaconChain { pub async fn process_block>( self: &Arc, unverified_block: B, - sidecar: Option>, + sidecar: Option>>, count_unrealized: CountUnrealized, ) -> Result> { // Start the Prometheus timer. @@ -2506,7 +2506,7 @@ impl BeaconChain { async fn import_execution_pending_block( self: Arc, execution_pending_block: ExecutionPendingBlock, - sidecar: Option>, + sidecar: Option>>, count_unrealized: CountUnrealized, ) -> Result> { let ExecutionPendingBlock { @@ -2585,7 +2585,7 @@ impl BeaconChain { fn import_block( &self, signed_block: Arc>, - sidecar: Option>, + sidecar: Option>>, block_root: Hash256, mut state: BeaconState, confirmed_state_roots: Vec, diff --git a/beacon_node/beacon_chain/src/snapshot_cache.rs b/beacon_node/beacon_chain/src/snapshot_cache.rs index 40b73451c..c77ef9e38 100644 --- a/beacon_node/beacon_chain/src/snapshot_cache.rs +++ b/beacon_node/beacon_chain/src/snapshot_cache.rs @@ -16,7 +16,7 @@ pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4; const MINIMUM_BLOCK_DELAY_FOR_CLONE: Duration = Duration::from_secs(6); /// This snapshot is to be used for verifying a child of `self.beacon_block`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct PreProcessingSnapshot { /// This state is equivalent to the `self.beacon_block.state_root()` state that has been /// advanced forward one slot using `per_slot_processing`. This state is "primed and ready" for diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 63b252099..0d07c6207 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -935,6 +935,7 @@ impl Worker { return } } else { + *self.chain.block_waiting_for_sidecar.lock() = Some(verified_block); // we need the sidecar but dont have it yet return }; @@ -1017,7 +1018,80 @@ impl Worker { duplicate_cache: DuplicateCache, seen_duration: Duration, ) { + let verified_block = self.chain.block_waiting_for_sidecar.lock().take(); + if let Some(verified_block) = verified_block { + let block = verified_block.block.clone(); + if verified_block.block_root() == blobs.message.beacon_block_root { + match self + .chain + .process_block(verified_block, Some(blobs), CountUnrealized::True) + .await + { + Ok(block_root) => { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); + if reprocess_tx + .try_send(ReprocessQueueMessage::BlockImported(block_root)) + .is_err() + { + error!( + self.log, + "Failed to inform block import"; + "source" => "gossip", + "block_root" => ?block_root, + ) + }; + + debug!( + self.log, + "Gossipsub block processed"; + "block" => ?block_root, + "peer_id" => %peer_id + ); + + self.chain.recompute_head_at_current_slot().await; + } + Err(BlockError::ParentUnknown { .. }) => { + // Inform the sync manager to find parents for this block + // This should not occur. It should be checked by `should_forward_block` + error!( + self.log, + "Block with unknown parent attempted to be processed"; + "peer_id" => %peer_id + ); + self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block)); + } + Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => { + debug!( + self.log, + "Failed to verify execution payload"; + "error" => %e + ); + } + other => { + debug!( + self.log, + "Invalid gossip beacon block"; + "outcome" => ?other, + "block root" => ?block.canonical_root(), + "block slot" => block.slot() + ); + self.gossip_penalize_peer( + peer_id, + PeerAction::MidToleranceError, + "bad_gossip_block_ssz", + ); + trace!( + self.log, + "Invalid gossip beacon block ssz"; + "ssz" => format_args!("0x{}", hex::encode(block.as_ssz_bytes())), + ); + } + }; + } + } else { + *self.chain.sidecar_waiting_for_block.lock() = Some(blobs); + } } pub fn process_gossip_voluntary_exit( diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 3c07b4073..de46843ad 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -784,7 +784,7 @@ impl, Cold: ItemStore> HotColdDB } StoreOp::PutBlobs(block_root, blobs) => { - guard_blob.put(*block_root, blobs.clone()); + guard_blob.put(*block_root, (**blobs).clone()); } StoreOp::PutState(_, _) => (), diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 5e74827c9..aac9cda93 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -156,7 +156,7 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), - PutBlobs(Hash256, SignedBlobsSidecar), + PutBlobs(Hash256, Arc>), PutStateSummary(Hash256, HotStateSummary), PutStateTemporaryFlag(Hash256), DeleteStateTemporaryFlag(Hash256),