From 42f54ee561519884e5dbfd7d40bc62e5a5a30a87 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 14 Jul 2023 16:01:57 -0400 Subject: [PATCH] fix merge conflict issues --- beacon_node/beacon_processor/src/lib.rs | 49 +++----- beacon_node/beacon_processor/src/metrics.rs | 10 ++ .../src/work_reprocessing_queue.rs | 4 +- beacon_node/execution_layer/src/lib.rs | 8 +- beacon_node/network/src/metrics.rs | 10 -- .../gossip_methods.rs | 7 +- .../src/network_beacon_processor/mod.rs | 108 +++++++++++++++++- .../network_beacon_processor/rpc_methods.rs | 6 +- .../network_beacon_processor/sync_methods.rs | 52 ++++++--- .../src/network_beacon_processor/tests.rs | 47 ++++---- beacon_node/network/src/router.rs | 39 ++++--- beacon_node/network/src/service/tests.rs | 4 +- .../src/sync/block_lookups/delayed_lookup.rs | 19 ++- .../network/src/sync/block_lookups/mod.rs | 76 ++++++------ .../network/src/sync/block_lookups/tests.rs | 11 +- beacon_node/network/src/sync/manager.rs | 22 ++-- .../network/src/sync/network_context.rs | 5 +- .../network/src/sync/range_sync/range.rs | 5 +- 18 files changed, 304 insertions(+), 178 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index f8360dd37..2e0299cad 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -38,26 +38,12 @@ //! checks the queues to see if there are more parcels of work that can be spawned in a new worker //! task. -use crate::sync::manager::BlockProcessType; -use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; -use beacon_chain::blob_verification::BlockWrapper; -use beacon_chain::parking_lot::Mutex; -use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock, NotifyExecutionLayer}; -use derivative::Derivative; use crate::work_reprocessing_queue::{ - spawn_reprocess_scheduler, QueuedAggregate, QueuedBackfillBatch, QueuedGossipBlock, - QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, ReprocessQueueMessage, + QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage, }; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; -use lighthouse_network::NetworkGlobals; -use lighthouse_network::{MessageId, PeerId}; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; -use lighthouse_network::rpc::LightClientBootstrapRequest; -use lighthouse_network::{ - rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, - Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, -}; +use lighthouse_network::{MessageId, NetworkGlobals, PeerId}; use logging::TimeLatch; use parking_lot::Mutex; use slog::{crit, debug, error, trace, warn, Logger}; @@ -73,22 +59,14 @@ use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, Slot, SubnetId}; +use types::{Attestation, Hash256, SignedAggregateAndProof, SubnetId}; +use types::{EthSpec, Slot}; use work_reprocessing_queue::IgnoredRpcBlock; -use types::blob_sidecar::FixedBlobSidecarList; -use types::{ - Attestation, AttesterSlashing, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate, - ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobSidecar, - SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, - SyncCommitteeMessage, SyncSubnetId, -}; use work_reprocessing_queue::{ spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, }; -use worker::{Toolbox, Worker}; - mod metrics; pub mod work_reprocessing_queue; @@ -240,7 +218,7 @@ pub const GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_upd pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; pub const RPC_BLOCK: &str = "rpc_block"; pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block"; -pub const RPC_BLOB: &str = "rpc_blob"; +pub const RPC_BLOBS: &str = "rpc_blob"; pub const CHAIN_SEGMENT: &str = "chain_segment"; pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill"; pub const STATUS_PROCESSING: &str = "status_processing"; @@ -594,6 +572,7 @@ impl Work { Work::GossipLightClientFinalityUpdate(_) => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE, Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE, Work::RpcBlock { .. } => RPC_BLOCK, + Work::RpcBlobs { .. } => RPC_BLOBS, Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK, Work::ChainSegment { .. } => CHAIN_SEGMENT, Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL, @@ -1359,12 +1338,18 @@ impl BeaconProcessor { beacon_block_root: _, process_fn, } => task_spawner.spawn_async(process_fn), - Work::RpcBlock { process_fn } | Work::RpcBlob { process_fn } => task_spawner.spawn_async(process_fn), + Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => { + task_spawner.spawn_async(process_fn) + } Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), - Work::GossipBlock(work)| Work::GossipBlob(work) => task_spawner.spawn_async(async move { - work.await; - }), - Work::BlobsByRangeRequest(work) | Work::BlobsByRootsRequest(work)| Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => { + Work::GossipBlock(work) | Work::GossipSignedBlobSidecar(work) => task_spawner + .spawn_async(async move { + work.await; + }), + Work::BlobsByRangeRequest(work) + | Work::BlobsByRootsRequest(work) + | Work::BlocksByRangeRequest(work) + | Work::BlocksByRootsRequest(work) => { task_spawner.spawn_blocking_with_manual_send_idle(work) } Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn), diff --git a/beacon_node/beacon_processor/src/metrics.rs b/beacon_node/beacon_processor/src/metrics.rs index 65ab0bd8f..dbe6d59ee 100644 --- a/beacon_node/beacon_processor/src/metrics.rs +++ b/beacon_node/beacon_processor/src/metrics.rs @@ -46,6 +46,11 @@ lazy_static::lazy_static! { "beacon_processor_gossip_block_queue_total", "Count of blocks from gossip waiting to be verified." ); + // Gossip blobs. + pub static ref BEACON_PROCESSOR_GOSSIP_BLOB_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_gossip_blob_queue_total", + "Count of blocks from gossip waiting to be verified." + ); // Gossip Exits. pub static ref BEACON_PROCESSOR_EXIT_QUEUE_TOTAL: Result = try_create_int_gauge( "beacon_processor_exit_queue_total", @@ -71,6 +76,11 @@ lazy_static::lazy_static! { "beacon_processor_rpc_block_queue_total", "Count of blocks from the rpc waiting to be verified." ); + // Rpc blobs. + pub static ref BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_rpc_blob_queue_total", + "Count of blobs from the rpc waiting to be verified." + ); // Chain segments. pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL: Result = try_create_int_gauge( "beacon_processor_chain_segment_queue_total", diff --git a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs index 358efa657..608f634d5 100644 --- a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs @@ -454,7 +454,7 @@ impl ReprocessQueue { if block_slot <= now && self .ready_work_tx - .try_send(ReadyWork::GossipBlock(early_block)) + .try_send(ReadyWork::Block(early_block)) .is_err() { error!( @@ -757,7 +757,7 @@ impl ReprocessQueue { if self .ready_work_tx - .try_send(ReadyWork::GossipBlock(ready_block)) + .try_send(ReadyWork::Block(ready_block)) .is_err() { error!( diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index e790da45f..10734d2e3 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -40,11 +40,13 @@ use tokio::{ }; use tokio_stream::wrappers::WatchStream; use tree_hash::TreeHash; -use types::{AbstractExecPayload, BeaconStateError, ExecPayload}; +use types::beacon_block_body::KzgCommitments; +use types::blob_sidecar::Blobs; +use types::{ + AbstractExecPayload, BeaconStateError, ExecPayload, ExecutionPayloadDeneb, VersionedHash, +}; use types::{ BlindedPayload, BlockType, ChainSpec, Epoch, ExecutionPayloadCapella, ExecutionPayloadMerge, - ForkVersionedResponse, ProposerPreparationData, PublicKeyBytes, Signature, SignedBeaconBlock, - Slot, }; use types::{KzgProofs, Withdrawals}; use types::{ProposerPreparationData, PublicKeyBytes, Signature, Slot, Transaction, Uint256}; diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 17d0915e6..73b98e210 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -67,11 +67,6 @@ lazy_static! { "beacon_processor_gossip_block_early_seconds", "Whenever a gossip block is received early this metrics is set to how early that block was." ); - // Gossip blobs. - pub static ref BEACON_PROCESSOR_GOSSIP_BLOB_QUEUE_TOTAL: Result = try_create_int_gauge( - "beacon_processor_gossip_blob_queue_total", - "Count of blocks from gossip waiting to be verified." - ); pub static ref BEACON_PROCESSOR_GOSSIP_BLOB_VERIFIED_TOTAL: Result = try_create_int_counter( "beacon_processor_gossip_blob_verified_total", "Total number of gossip blob verified for propagation." @@ -125,11 +120,6 @@ lazy_static! { "beacon_processor_rpc_block_imported_total", "Total number of gossip blocks imported to fork choice, etc." ); - // Rpc blobs. - pub static ref BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL: Result = try_create_int_gauge( - "beacon_processor_rpc_blob_queue_total", - "Count of blobs from the rpc waiting to be verified." - ); pub static ref BEACON_PROCESSOR_RPC_BLOB_IMPORTED_TOTAL: Result = try_create_int_counter( "beacon_processor_rpc_blob_imported_total", "Total number of gossip blobs imported." diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index cfaed2477..2255b4017 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -5,7 +5,8 @@ use crate::{ sync::SyncMessage, }; -use beacon_chain::blob_verification::{AsBlock, BlobError, GossipVerifiedBlob}; +use beacon_chain::blob_verification::{BlobError, GossipVerifiedBlob}; +use beacon_chain::block_verification_types::AsBlock; use beacon_chain::store::Error; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, @@ -600,7 +601,7 @@ impl NetworkBeaconProcessor { // TODO: docs #[allow(clippy::too_many_arguments)] pub async fn process_gossip_blob( - self, + self: &Arc, message_id: MessageId, peer_id: PeerId, _peer_client: Client, @@ -699,7 +700,7 @@ impl NetworkBeaconProcessor { } pub async fn process_gossip_verified_blob( - self, + self: &Arc, peer_id: PeerId, verified_blob: GossipVerifiedBlob, // This value is not used presently, but it might come in handy for debugging. diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7f0ef1fb8..166417ba9 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -2,6 +2,7 @@ use crate::{ service::NetworkMessage, sync::{manager::BlockProcessType, SyncMessage}, }; +use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{ builder::Witness, eth1_chain::CachingEth1Backend, test_utils::BeaconChainHarness, BeaconChain, }; @@ -25,9 +26,11 @@ use store::MemoryStore; use task_executor::test_utils::TestRuntime; use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, error::TrySendError}; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use types::*; pub use sync_methods::ChainSegmentProcessId; +use types::blob_sidecar::FixedBlobSidecarList; pub type Error = TrySendError>; @@ -196,6 +199,40 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for some blob sidecar. + pub fn send_gossip_blob_sidecar( + self: &Arc, + message_id: MessageId, + peer_id: PeerId, + peer_client: Client, + blob_index: u64, + blob: SignedBlobSidecar, + seen_timestamp: Duration, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = async move { + processor + .process_gossip_blob( + message_id, + peer_id, + peer_client, + blob_index, + blob, + seen_timestamp, + ) + .await + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::GossipSignedBlobSidecar(Box::pin(process_fn)), + }) + } + + pub fn send_banana(){ + + } + /// Create a new `Work` event for some sync committee signature. pub fn send_gossip_sync_signature( self: &Arc, @@ -376,7 +413,7 @@ impl NetworkBeaconProcessor { pub fn send_rpc_beacon_block( self: &Arc, block_root: Hash256, - block: Arc>, + block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, ) -> Result<(), Error> { @@ -392,11 +429,32 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for some block, where the result from computation (if any) is + /// sent to the other side of `result_tx`. + pub fn send_rpc_blobs( + self: &Arc, + block_root: Hash256, + blobs: FixedBlobSidecarList, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Result<(), Error> { + let process_fn = self.clone().generate_rpc_blobs_process_fn( + block_root, + blobs, + seen_timestamp, + process_type, + ); + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::RpcBlobs { process_fn }, + }) + } + /// Create a new work event to import `blocks` as a beacon chain segment. pub fn send_chain_segment( self: &Arc, process_id: ChainSegmentProcessId, - blocks: Vec>>, + blocks: Vec>, ) -> Result<(), Error> { let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. }); let processor = self.clone(); @@ -496,6 +554,52 @@ impl NetworkBeaconProcessor { }) } + /// Create a new work event to process `BlobsByRangeRequest`s from the RPC network. + pub fn send_blobs_by_range_request( + self: &Arc, + peer_id: PeerId, + request_id: PeerRequestId, + request: BlobsByRangeRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = move |send_idle_on_drop| { + processor.handle_blobs_by_range_request( + send_idle_on_drop, + peer_id, + request_id, + request, + ) + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::BlobsByRangeRequest(Box::new(process_fn)), + }) + } + + /// Create a new work event to process `BlobsByRootRequest`s from the RPC network. + pub fn send_blobs_by_roots_request( + self: &Arc, + peer_id: PeerId, + request_id: PeerRequestId, + request: BlobsByRootRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = move |send_idle_on_drop| { + processor.handle_blobs_by_root_request( + send_idle_on_drop, + peer_id, + request_id, + request, + ) + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::BlobsByRootsRequest(Box::new(process_fn)), + }) + } + /// Create a new work event to process `LightClientBootstrap`s from the RPC network. pub fn send_lightclient_bootstrap_request( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index d55d4bb62..c653305c5 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -13,8 +13,8 @@ use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error, trace, warn}; use slot_clock::SlotClock; -use std::sync::Arc; use std::collections::{hash_map::Entry, HashMap}; +use std::sync::Arc; use task_executor::TaskExecutor; use tokio_stream::StreamExt; use types::blob_sidecar::BlobIdentifier; @@ -217,7 +217,7 @@ impl NetworkBeaconProcessor { } /// Handle a `BlobsByRoot` request from the peer. pub fn handle_blobs_by_root_request( - self, + self: Arc, send_on_drop: SendOnDrop, peer_id: PeerId, request_id: PeerRequestId, @@ -616,7 +616,7 @@ impl NetworkBeaconProcessor { /// Handle a `BlobsByRange` request from the peer. pub fn handle_blobs_by_range_request( - self, + self: Arc, send_on_drop: SendOnDrop, peer_id: PeerId, request_id: PeerRequestId, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 7ea551ae0..3a09373c5 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -1,16 +1,18 @@ -use std::time::Duration; - use crate::metrics; use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE}; +use crate::sync::manager::ResponseType; use crate::sync::BatchProcessResult; use crate::sync::{ manager::{BlockProcessType, SyncMessage}, ChainId, }; +use beacon_chain::data_availability_checker::MaybeAvailableBlock; +use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; +use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::{ observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms, - BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, - NotifyExecutionLayer, + AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, + ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, }; use beacon_processor::{ work_reprocessing_queue::{QueuedRpcBlock, ReprocessQueueMessage}, @@ -19,6 +21,8 @@ use beacon_processor::{ use lighthouse_network::PeerAction; use slog::{debug, error, info, warn}; use slot_clock::SlotClock; +use std::sync::Arc; +use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; @@ -44,14 +48,14 @@ struct ChainSegmentFailed { } impl NetworkBeaconProcessor { - /// Returns an async closure which processes a beacon block recieved via RPC. + /// Returns an async closure which processes a beacon block received via RPC. /// /// This separate function was required to prevent a cycle during compiler /// type checking. pub fn generate_rpc_beacon_block_process_fn( self: Arc, block_root: Hash256, - block: Arc>, + block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, ) -> AsyncFn { @@ -75,7 +79,7 @@ impl NetworkBeaconProcessor { pub fn generate_rpc_beacon_block_fns( self: Arc, block_root: Hash256, - block: BlockWrapper, + block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, ) -> (AsyncFn, BlockingFn) { @@ -103,7 +107,7 @@ impl NetworkBeaconProcessor { pub async fn process_rpc_block( self: Arc>, block_root: Hash256, - block: Arc>, + block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, reprocess_tx: mpsc::Sender, @@ -254,8 +258,26 @@ impl NetworkBeaconProcessor { drop(handle); } + /// Returns an async closure which processes a list of blobs received via RPC. + /// + /// This separate function was required to prevent a cycle during compiler + /// type checking. + pub fn generate_rpc_blobs_process_fn( + self: Arc, + block_root: Hash256, + block: FixedBlobSidecarList, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> AsyncFn { + let process_fn = async move { + self.clone().process_rpc_blobs(block_root, block, seen_timestamp, process_type) + .await; + }; + Box::pin(process_fn) + } + pub async fn process_rpc_blobs( - self, + self: Arc>, block_root: Hash256, blobs: FixedBlobSidecarList, _seen_timestamp: Duration, @@ -284,12 +306,16 @@ impl NetworkBeaconProcessor { }); } + pub fn send_delayed_lookup(&self, block_root: Hash256){ + self.send_sync_message(SyncMessage::MissingGossipBlockComponentsDelayed(block_root)) + } + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. pub async fn process_chain_segment( &self, sync_type: ChainSegmentProcessId, - downloaded_blocks: Vec>, + downloaded_blocks: Vec>, notify_execution_layer: NotifyExecutionLayer, ) { let result = match sync_type { @@ -414,7 +440,7 @@ impl NetworkBeaconProcessor { /// Helper function to process blocks batches which only consumes the chain and blocks to process. async fn process_blocks<'a>( &self, - downloaded_blocks: impl Iterator>, + downloaded_blocks: impl Iterator>, notify_execution_layer: NotifyExecutionLayer, ) -> (usize, Result<(), ChainSegmentFailed>) { let blocks: Vec<_> = downloaded_blocks.cloned().collect(); @@ -447,7 +473,7 @@ impl NetworkBeaconProcessor { /// Helper function to process backfill block batches which only consumes the chain and blocks to process. fn process_backfill_blocks( &self, - downloaded_blocks: Vec>, + downloaded_blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { let total_blocks = downloaded_blocks.len(); let available_blocks = match downloaded_blocks @@ -455,7 +481,7 @@ impl NetworkBeaconProcessor { .map(|block| { self.chain .data_availability_checker - .check_availability(block) + .check_rpc_block_availability(block) }) .collect::, _>>() { diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 1273929bd..39d3575d6 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -10,13 +10,16 @@ use crate::{ use beacon_chain::test_utils::{ test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, }; -use beacon_chain::{BeaconChain, ChainConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; +use beacon_chain::{BeaconChain, ChainConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; use beacon_processor::{work_reprocessing_queue::*, *}; +use lighthouse_network::discovery::ConnectionId; +use lighthouse_network::rpc::methods::BlobsByRangeRequest; +use lighthouse_network::rpc::SubstreamId; use lighthouse_network::{ discv5::enr::{CombinedKey, EnrBuilder}, rpc::methods::{MetaData, MetaDataV2}, types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield}, - Client, MessageId, NetworkGlobals, PeerId, + Client, MessageId, NetworkGlobals, PeerId, Response, }; use slot_clock::SlotClock; use std::cmp; @@ -24,9 +27,11 @@ use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; +use types::blob_sidecar::FixedBlobSidecarList; use types::{ - Attestation, AttesterSlashing, Epoch, EthSpec, Hash256, MainnetEthSpec, ProposerSlashing, - SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, + Attestation, AttesterSlashing, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, + SignedAggregateAndProof, SignedBeaconBlock, SignedBlobSidecarList, SignedVoluntaryExit, Slot, + SubnetId, }; type E = MainnetEthSpec; @@ -275,15 +280,15 @@ impl TestRig { pub fn enqueue_gossip_blob(&self, blob_index: usize) { if let Some(blobs) = self.next_blobs.as_ref() { let blob = blobs.get(blob_index).unwrap(); - self.beacon_processor_tx - .try_send(WorkEvent::gossip_signed_blob_sidecar( + self.network_beacon_processor + .send_gossip_blob_sidecar( junk_message_id(), junk_peer_id(), Client::default(), - blob_index as u64, + blob.message.index, blob.clone(), Duration::from_secs(0), - )) + ) .unwrap(); } } @@ -319,26 +324,26 @@ impl TestRig { .map(|b| Some(b.message)) .collect::>(), ); - let event = WorkEvent::rpc_blobs( - self.next_block.canonical_root(), - blobs, - std::time::Duration::default(), - BlockProcessType::SingleBlock { id: 1 }, - ); - self.beacon_processor_tx.try_send(event).unwrap(); + self.network_beacon_processor + .send_rpc_blobs( + self.next_block.canonical_root(), + blobs, + std::time::Duration::default(), + BlockProcessType::SingleBlock { id: 1 }, + ) + .unwrap(); } } pub fn enqueue_blobs_by_range_request(&self, count: u64) { - let event = WorkEvent::blobs_by_range_request( + self.network_beacon_processor.send_blobs_by_range_request( PeerId::random(), (ConnectionId::new(42), SubstreamId::new(24)), BlobsByRangeRequest { start_slot: 0, count, }, - ); - self.beacon_processor_tx.try_send(event).unwrap(); + ).unwrap(); } pub fn enqueue_backfill_batch(&self) { @@ -733,7 +738,7 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod events.push(RPC_BLOCK); if num_blobs > 0 { rig.enqueue_single_lookup_rpc_blobs(); - events.push(RPC_BLOB); + events.push(RPC_BLOBS); } } }; @@ -816,7 +821,7 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod events.push(RPC_BLOCK); if num_blobs > 0 { rig.enqueue_single_lookup_rpc_blobs(); - events.push(RPC_BLOB); + events.push(RPC_BLOBS); } } }; @@ -996,7 +1001,7 @@ async fn test_rpc_block_reprocessing() { rig.enqueue_single_lookup_rpc_blobs(); if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 { - rig.assert_event_journal(&[RPC_BLOB, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal(&[RPC_BLOBS, WORKER_FREED, NOTHING_TO_DO]) .await; } diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 5dcdb592e..5a954d05a 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -208,11 +208,19 @@ impl Router { self.network_beacon_processor .send_blocks_by_roots_request(peer_id, request_id, request), ), - Request::BlobsByRange(request) => self.send_beacon_processor_work( - BeaconWorkEvent::blobs_by_range_request(peer_id, request_id, request), + Request::BlobsByRange(request) => self.handle_beacon_processor_send_result( + self.network_beacon_processor.send_blobs_by_range_request( + peer_id, + request_id, + request, + ), ), - Request::BlobsByRoot(request) => self.send_beacon_processor_work( - BeaconWorkEvent::blobs_by_root_request(peer_id, request_id, request), + Request::BlobsByRoot(request) => self.handle_beacon_processor_send_result( + self.network_beacon_processor.send_blobs_by_roots_request( + peer_id, + request_id, + request, + ), ), Request::LightClientBootstrap(request) => self.handle_beacon_processor_send_result( self.network_beacon_processor @@ -291,19 +299,20 @@ impl Router { self.network_globals.client(&peer_id), block, timestamp_now(), - )) - } + ), + ), PubsubMessage::BlobSidecar(data) => { let (blob_index, signed_blob) = *data; - let peer_client = self.network_globals.client(&peer_id); - self.send_beacon_processor_work(BeaconWorkEvent::gossip_signed_blob_sidecar( - message_id, - peer_id, - peer_client, - blob_index, - signed_blob, - timestamp_now(), - )) + self.handle_beacon_processor_send_result( + self.network_beacon_processor.send_gossip_blob_sidecar( + message_id, + peer_id, + self.network_globals.client(&peer_id), + blob_index, + signed_blob, + timestamp_now(), + ) + ) } PubsubMessage::VoluntaryExit(exit) => { debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id); diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 7f697e614..10110aa89 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -3,7 +3,6 @@ mod tests { use crate::persisted_dht::load_dht; use crate::{NetworkConfig, NetworkService}; - use beacon_chain::test_utils::BeaconChainHarness; use beacon_processor::{ BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, }; @@ -13,8 +12,7 @@ mod tests { use std::str::FromStr; use std::sync::Arc; use tokio::{runtime::Runtime, sync::mpsc}; - use types::MinimalEthSpec; - use tokio::runtime::Runtime; + use beacon_chain::test_utils::EphemeralHarnessType; use types::MinimalEthSpec as E; type BeaconChainHarness = beacon_chain::test_utils::BeaconChainHarness>; diff --git a/beacon_node/network/src/sync/block_lookups/delayed_lookup.rs b/beacon_node/network/src/sync/block_lookups/delayed_lookup.rs index a2a44ecdd..c492470b4 100644 --- a/beacon_node/network/src/sync/block_lookups/delayed_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/delayed_lookup.rs @@ -1,12 +1,12 @@ -use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use slog::{crit, warn}; +use slog::{crit, }; use slot_clock::SlotClock; use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::interval_at; use tokio::time::Instant; -use types::Hash256; +use types::{ Hash256}; +use crate::network_beacon_processor::NetworkBeaconProcessor; #[derive(Debug)] pub enum DelayedLookupMessage { @@ -35,7 +35,7 @@ pub fn spawn_delayed_lookup_service( executor: &task_executor::TaskExecutor, beacon_chain: Arc>, mut delayed_lookups_recv: mpsc::Receiver, - sync_send: mpsc::UnboundedSender>, + beacon_processor: Arc>, log: slog::Logger, ) { executor.spawn( @@ -52,8 +52,8 @@ pub fn spawn_delayed_lookup_service( } else { delay - seconds_from_current_slot_start }; - tokio::time::Instant::now() + duration_until_start - } + Instant::now() + duration_until_start + } _ => { crit!(log, "Failed to read slot clock, delayed lookup service timing will be inaccurate.\ @@ -69,11 +69,8 @@ pub fn spawn_delayed_lookup_service( while let Ok(msg) = delayed_lookups_recv.try_recv() { match msg { DelayedLookupMessage::MissingComponents(block_root) => { - if let Err(e) = sync_send - .send(SyncMessage::MissingGossipBlockComponentsDelayed(block_root)) - { - warn!(log, "Failed to send delayed lookup message"; "error" => ?e); - } + beacon_processor + .send_delayed_lookup(block_root) } } } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 21b7a8dbc..ff095c719 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,21 +1,3 @@ -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::time::Duration; - -use crate::network_beacon_processor::ChainSegmentProcessId; -use beacon_chain::{BeaconChainTypes, BlockError}; -use fnv::FnvHashMap; -use lighthouse_network::{PeerAction, PeerId}; -use lru_cache::LRUTimeCache; -use slog::{debug, error, trace, warn, Logger}; -use smallvec::SmallVec; -use std::collections::HashMap; -use std::fmt::Debug; -use std::sync::Arc; -use store::{Hash256, SignedBeaconBlock}; - -use crate::metrics; - use self::parent_lookup::PARENT_FAIL_TOLERANCE; use self::parent_lookup::{ParentLookup, ParentVerifyError}; use self::single_block_lookup::{LookupVerifyError, SingleBlockLookup}; @@ -25,10 +7,26 @@ use super::{ manager::{BlockProcessType, Id}, network_context::SyncNetworkContext, }; -use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; +use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::block_lookups::single_block_lookup::LookupId; +use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; +use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; +use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; +use lighthouse_network::rpc::RPCError; +use lighthouse_network::{PeerAction, PeerId}; +use lru_cache::LRUTimeCache; pub use single_block_lookup::UnknownParentComponents; +use slog::{debug, error, trace, warn, Logger}; +use smallvec::SmallVec; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; +use std::time::Duration; +use store::{Hash256, SignedBeaconBlock}; +use strum::Display; +use types::blob_sidecar::FixedBlobSidecarList; +use types::{BlobSidecar, Slot}; pub(crate) mod delayed_lookup; mod parent_lookup; @@ -36,7 +34,7 @@ mod single_block_lookup; #[cfg(test)] mod tests; -pub type DownloadedBlocks = (Hash256, BlockWrapper); +pub type DownloadedBlocks = (Hash256, RpcBlock); pub type RootBlockTuple = (Hash256, Arc>); pub type RootBlobsTuple = (Hash256, FixedBlobSidecarList); @@ -381,13 +379,13 @@ impl BlockLookups { }; if !has_pending_parent_request { - let block_wrapper = request_ref + let rpc_block = request_ref .get_downloaded_block() - .unwrap_or(BlockWrapper::Block(block)); + .unwrap_or(RpcBlock::new_without_blobs(block)); // This is the correct block, send it for processing match self.send_block_for_processing( block_root, - block_wrapper, + rpc_block, seen_timestamp, BlockProcessType::SingleBlock { id }, cx, @@ -563,14 +561,13 @@ impl BlockLookups { match parent_lookup.verify_block(block, &mut self.failed_chains) { Ok(Some((block_root, block))) => { parent_lookup.add_current_request_block(block); - if let Some(block_wrapper) = - parent_lookup.current_parent_request.get_downloaded_block() + if let Some(rpc_block) = parent_lookup.current_parent_request.get_downloaded_block() { let chain_hash = parent_lookup.chain_hash(); if self .send_block_for_processing( block_root, - block_wrapper, + rpc_block, seen_timestamp, BlockProcessType::ParentLookup { chain_hash }, cx, @@ -644,13 +641,12 @@ impl BlockLookups { Ok(Some((block_root, blobs))) => { parent_lookup.add_current_request_blobs(blobs); let chain_hash = parent_lookup.chain_hash(); - if let Some(block_wrapper) = - parent_lookup.current_parent_request.get_downloaded_block() + if let Some(rpc_block) = parent_lookup.current_parent_request.get_downloaded_block() { if self .send_block_for_processing( block_root, - block_wrapper, + rpc_block, seen_timestamp, BlockProcessType::ParentLookup { chain_hash }, cx, @@ -914,11 +910,7 @@ impl BlockLookups { BlockError::ParentUnknown(block) => { let slot = block.slot(); let parent_root = block.parent_root(); - let (block, blobs) = block.deconstruct(); - request_ref.add_unknown_parent_block(block); - if let Some(blobs) = blobs { - request_ref.add_unknown_parent_blobs(blobs); - } + request_ref.add_unknown_parent_components(block.into()); self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx); ShouldRemoveLookup::False } @@ -1077,7 +1069,6 @@ impl BlockLookups { blocks.push(child_block); }; let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); - let work = WorkEvent::chain_segment(process_id, blocks); match beacon_processor.send_chain_segment(process_id, blocks) { Ok(_) => { @@ -1171,7 +1162,7 @@ impl BlockLookups { .enumerate() .find(|(_, req)| req.block_request_state.requested_block_root == chain_hash) { - if let Some((lookup_id, block_wrapper)) = + if let Some((lookup_id, rpc_block)) = self.single_block_lookups.get_mut(index).and_then(|lookup| { lookup .get_downloaded_block() @@ -1191,7 +1182,7 @@ impl BlockLookups { if self .send_block_for_processing( chain_hash, - block_wrapper, + rpc_block, Duration::from_secs(0), //TODO(sean) pipe this through BlockProcessType::SingleBlock { id }, cx, @@ -1231,7 +1222,7 @@ impl BlockLookups { fn send_block_for_processing( &mut self, block_root: Hash256, - block: BlockWrapper, + block: RpcBlock, duration: Duration, process_type: BlockProcessType, cx: &mut SyncNetworkContext, @@ -1274,11 +1265,12 @@ impl BlockLookups { if blob_count == 0 { return Ok(()); } - match cx.processor_channel_if_enabled() { - Some(beacon_processor_send) => { + match cx.beacon_processor_if_enabled() { + Some(beacon_processor) => { trace!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process_type" => ?process_type); - let event = WorkEvent::rpc_blobs(block_root, blobs, duration, process_type); - if let Err(e) = beacon_processor_send.try_send(event) { + if let Err(e) = + beacon_processor.send_rpc_blobs(block_root, blobs, duration, process_type) + { error!( self.log, "Failed to send sync blobs to processor"; diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 8d2dc58be..c8f19b169 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1,19 +1,20 @@ #![cfg(feature = "spec-minimal")] -use std::sync::Arc; - use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::RequestId; use crate::sync::manager::RequestId as SyncId; use crate::NetworkMessage; +use std::sync::Arc; use super::*; use beacon_chain::builder::Witness; use beacon_chain::eth1_chain::CachingEth1Backend; +use beacon_chain::test_utils::{build_log, BeaconChainHarness, EphemeralHarnessType}; use beacon_processor::WorkEvent; +use execution_layer::BlobsBundleV1; +use lighthouse_network::rpc::RPCResponseErrorCode; use lighthouse_network::{NetworkGlobals, Request}; -use slog::{Drain, Level}; -use slot_clock::ManualSlotClock; +use slot_clock::{ManualSlotClock, SlotClock, TestingSlotClock}; use store::MemoryStore; use tokio::sync::mpsc; use types::{ @@ -212,7 +213,7 @@ impl TestRig { }, ResponseType::Blob => match self.beacon_processor_rx.try_recv() { Ok(work) => { - assert_eq!(work.work_type(), beacon_processor::RPC_BLOB); + assert_eq!(work.work_type(), beacon_processor::RPC_BLOBS); } other => panic!("Expected blob process, found {:?}", other), }, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 405f24b6e..8bdf57e2a 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -127,7 +127,7 @@ pub enum SyncMessage { }, /// A block with an unknown parent has been received. - UnknownParentBlock(PeerId, BlockWrapper, Hash256), + UnknownParentBlock(PeerId, RpcBlock, Hash256), /// A blob with an unknown parent has been received. UnknownParentBlob(PeerId, Arc>), @@ -242,13 +242,20 @@ pub fn spawn( MAX_REQUEST_BLOCKS >= T::EthSpec::slots_per_epoch() * EPOCHS_PER_BATCH, "Max blocks that can be requested in a single batch greater than max allowed blocks in a single request" ); + let (delayed_lookups_send, delayed_lookups_recv) = + mpsc::channel::(DELAY_QUEUE_CHANNEL_SIZE); // create an instance of the SyncManager let network_globals = beacon_processor.network_globals.clone(); let mut sync_manager = SyncManager { chain: beacon_chain.clone(), input_channel: sync_recv, - network: SyncNetworkContext::new(network_send, beacon_processor, log.clone()), + network: SyncNetworkContext::new( + network_send, + beacon_processor.clone(), + beacon_chain.clone(), + log.clone(), + ), range_sync: RangeSync::new(beacon_chain.clone(), log.clone()), backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()), block_lookups: BlockLookups::new( @@ -260,12 +267,11 @@ pub fn spawn( }; let log_clone = log.clone(); - let sync_send_clone = sync_send.clone(); delayed_lookup::spawn_delayed_lookup_service( &executor, beacon_chain, delayed_lookups_recv, - sync_send, + beacon_processor, log, ); @@ -792,7 +798,7 @@ impl SyncManager { } fn should_search_for_block(&mut self, block_slot: Slot, peer_id: &PeerId) -> bool { - if !self.network_globals.sync_state.read().is_synced() { + if !self.network_globals().sync_state.read().is_synced() { let head_slot = self.chain.canonical_head.cached_head().head_slot(); // if the block is far in the future, ignore it. If its within the slot tolerance of @@ -806,13 +812,13 @@ impl SyncManager { } } - self.network_globals.peers.read().is_connected(peer_id) + self.network_globals().peers.read().is_connected(peer_id) && self.network.is_execution_engine_online() } fn synced_and_connected(&mut self, peer_id: &PeerId) -> bool { - self.network_globals.sync_state.read().is_synced() - && self.network_globals.peers.read().is_connected(peer_id) + self.network_globals().sync_state.read().is_synced() + && self.network_globals().peers.read().is_connected(peer_id) && self.network.is_execution_engine_online() } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 963871e1f..7c162f478 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -4,8 +4,7 @@ use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use super::manager::{Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; -use crate::beacon_processor::BeaconProcessorSend; -duse crate::network_beacon_processor::NetworkBeaconProcessor; +use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{BlobRequestId, BlockRequestId}; @@ -97,6 +96,8 @@ impl SyncNetworkContext { request_id: 1, range_requests: FnvHashMap::default(), backfill_requests: FnvHashMap::default(), + range_blocks_and_blobs_requests: FnvHashMap::default(), + backfill_blocks_and_blobs_requests: FnvHashMap::default(), network_beacon_processor, chain, log, diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index ca40b80ac..09a85208d 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -395,11 +395,11 @@ mod tests { use slog::{o, Drain}; use tokio::sync::mpsc; - use slot_clock::ManualSlotClock; + use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; + use slot_clock::{TestingSlotClock, }; use std::collections::HashSet; use std::sync::Arc; use store::MemoryStore; - use tokio::sync::mpsc; use types::{Hash256, MinimalEthSpec as E}; #[derive(Debug)] @@ -612,7 +612,6 @@ mod tests { let chain = harness.chain; let fake_store = Arc::new(FakeStorage::default()); - let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(10); let range_sync = RangeSync::::new( fake_store.clone(), log.new(o!("component" => "range")),