fix merge conflict issues

This commit is contained in:
realbigsean 2023-07-14 16:01:57 -04:00
parent a6f48f5ecb
commit 42f54ee561
No known key found for this signature in database
GPG Key ID: BE1B3DB104F6C788
18 changed files with 304 additions and 178 deletions

View File

@ -38,26 +38,12 @@
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker //! checks the queues to see if there are more parcels of work that can be spawned in a new worker
//! task. //! task.
use crate::sync::manager::BlockProcessType;
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::parking_lot::Mutex;
use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock, NotifyExecutionLayer};
use derivative::Derivative;
use crate::work_reprocessing_queue::{ use crate::work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedBackfillBatch, QueuedGossipBlock, QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage,
QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, ReprocessQueueMessage,
}; };
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use futures::task::Poll; use futures::task::Poll;
use lighthouse_network::NetworkGlobals; use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
use lighthouse_network::{MessageId, PeerId};
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
use lighthouse_network::rpc::LightClientBootstrapRequest;
use lighthouse_network::{
rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage},
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
};
use logging::TimeLatch; use logging::TimeLatch;
use parking_lot::Mutex; use parking_lot::Mutex;
use slog::{crit, debug, error, trace, warn, Logger}; use slog::{crit, debug, error, trace, warn, Logger};
@ -73,22 +59,14 @@ use std::time::Duration;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::error::TrySendError;
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, Slot, SubnetId}; use types::{Attestation, Hash256, SignedAggregateAndProof, SubnetId};
use types::{EthSpec, Slot};
use work_reprocessing_queue::IgnoredRpcBlock; use work_reprocessing_queue::IgnoredRpcBlock;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
Attestation, AttesterSlashing, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate,
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobSidecar,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
SyncCommitteeMessage, SyncSubnetId,
};
use work_reprocessing_queue::{ use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
QueuedUnaggregate, ReadyWork, QueuedUnaggregate, ReadyWork,
}; };
use worker::{Toolbox, Worker};
mod metrics; mod metrics;
pub mod work_reprocessing_queue; pub mod work_reprocessing_queue;
@ -240,7 +218,7 @@ pub const GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_upd
pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
pub const RPC_BLOCK: &str = "rpc_block"; pub const RPC_BLOCK: &str = "rpc_block";
pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block"; pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block";
pub const RPC_BLOB: &str = "rpc_blob"; pub const RPC_BLOBS: &str = "rpc_blob";
pub const CHAIN_SEGMENT: &str = "chain_segment"; pub const CHAIN_SEGMENT: &str = "chain_segment";
pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill"; pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill";
pub const STATUS_PROCESSING: &str = "status_processing"; pub const STATUS_PROCESSING: &str = "status_processing";
@ -594,6 +572,7 @@ impl<E: EthSpec> Work<E> {
Work::GossipLightClientFinalityUpdate(_) => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE, Work::GossipLightClientFinalityUpdate(_) => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE,
Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE, Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
Work::RpcBlock { .. } => RPC_BLOCK, Work::RpcBlock { .. } => RPC_BLOCK,
Work::RpcBlobs { .. } => RPC_BLOBS,
Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK, Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK,
Work::ChainSegment { .. } => CHAIN_SEGMENT, Work::ChainSegment { .. } => CHAIN_SEGMENT,
Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL, Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL,
@ -1359,12 +1338,18 @@ impl<E: EthSpec> BeaconProcessor<E> {
beacon_block_root: _, beacon_block_root: _,
process_fn, process_fn,
} => task_spawner.spawn_async(process_fn), } => task_spawner.spawn_async(process_fn),
Work::RpcBlock { process_fn } | Work::RpcBlob { process_fn } => task_spawner.spawn_async(process_fn), Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => {
task_spawner.spawn_async(process_fn)
}
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::GossipBlock(work)| Work::GossipBlob(work) => task_spawner.spawn_async(async move { Work::GossipBlock(work) | Work::GossipSignedBlobSidecar(work) => task_spawner
.spawn_async(async move {
work.await; work.await;
}), }),
Work::BlobsByRangeRequest(work) | Work::BlobsByRootsRequest(work)| Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => { Work::BlobsByRangeRequest(work)
| Work::BlobsByRootsRequest(work)
| Work::BlocksByRangeRequest(work)
| Work::BlocksByRootsRequest(work) => {
task_spawner.spawn_blocking_with_manual_send_idle(work) task_spawner.spawn_blocking_with_manual_send_idle(work)
} }
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn), Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),

View File

@ -46,6 +46,11 @@ lazy_static::lazy_static! {
"beacon_processor_gossip_block_queue_total", "beacon_processor_gossip_block_queue_total",
"Count of blocks from gossip waiting to be verified." "Count of blocks from gossip waiting to be verified."
); );
// Gossip blobs.
pub static ref BEACON_PROCESSOR_GOSSIP_BLOB_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_gossip_blob_queue_total",
"Count of blocks from gossip waiting to be verified."
);
// Gossip Exits. // Gossip Exits.
pub static ref BEACON_PROCESSOR_EXIT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge( pub static ref BEACON_PROCESSOR_EXIT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_exit_queue_total", "beacon_processor_exit_queue_total",
@ -71,6 +76,11 @@ lazy_static::lazy_static! {
"beacon_processor_rpc_block_queue_total", "beacon_processor_rpc_block_queue_total",
"Count of blocks from the rpc waiting to be verified." "Count of blocks from the rpc waiting to be verified."
); );
// Rpc blobs.
pub static ref BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_rpc_blob_queue_total",
"Count of blobs from the rpc waiting to be verified."
);
// Chain segments. // Chain segments.
pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge( pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_chain_segment_queue_total", "beacon_processor_chain_segment_queue_total",

View File

@ -454,7 +454,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
if block_slot <= now if block_slot <= now
&& self && self
.ready_work_tx .ready_work_tx
.try_send(ReadyWork::GossipBlock(early_block)) .try_send(ReadyWork::Block(early_block))
.is_err() .is_err()
{ {
error!( error!(
@ -757,7 +757,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
if self if self
.ready_work_tx .ready_work_tx
.try_send(ReadyWork::GossipBlock(ready_block)) .try_send(ReadyWork::Block(ready_block))
.is_err() .is_err()
{ {
error!( error!(

View File

@ -40,11 +40,13 @@ use tokio::{
}; };
use tokio_stream::wrappers::WatchStream; use tokio_stream::wrappers::WatchStream;
use tree_hash::TreeHash; use tree_hash::TreeHash;
use types::{AbstractExecPayload, BeaconStateError, ExecPayload}; use types::beacon_block_body::KzgCommitments;
use types::blob_sidecar::Blobs;
use types::{
AbstractExecPayload, BeaconStateError, ExecPayload, ExecutionPayloadDeneb, VersionedHash,
};
use types::{ use types::{
BlindedPayload, BlockType, ChainSpec, Epoch, ExecutionPayloadCapella, ExecutionPayloadMerge, BlindedPayload, BlockType, ChainSpec, Epoch, ExecutionPayloadCapella, ExecutionPayloadMerge,
ForkVersionedResponse, ProposerPreparationData, PublicKeyBytes, Signature, SignedBeaconBlock,
Slot,
}; };
use types::{KzgProofs, Withdrawals}; use types::{KzgProofs, Withdrawals};
use types::{ProposerPreparationData, PublicKeyBytes, Signature, Slot, Transaction, Uint256}; use types::{ProposerPreparationData, PublicKeyBytes, Signature, Slot, Transaction, Uint256};

View File

@ -67,11 +67,6 @@ lazy_static! {
"beacon_processor_gossip_block_early_seconds", "beacon_processor_gossip_block_early_seconds",
"Whenever a gossip block is received early this metrics is set to how early that block was." "Whenever a gossip block is received early this metrics is set to how early that block was."
); );
// Gossip blobs.
pub static ref BEACON_PROCESSOR_GOSSIP_BLOB_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_gossip_blob_queue_total",
"Count of blocks from gossip waiting to be verified."
);
pub static ref BEACON_PROCESSOR_GOSSIP_BLOB_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter( pub static ref BEACON_PROCESSOR_GOSSIP_BLOB_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_gossip_blob_verified_total", "beacon_processor_gossip_blob_verified_total",
"Total number of gossip blob verified for propagation." "Total number of gossip blob verified for propagation."
@ -125,11 +120,6 @@ lazy_static! {
"beacon_processor_rpc_block_imported_total", "beacon_processor_rpc_block_imported_total",
"Total number of gossip blocks imported to fork choice, etc." "Total number of gossip blocks imported to fork choice, etc."
); );
// Rpc blobs.
pub static ref BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_rpc_blob_queue_total",
"Count of blobs from the rpc waiting to be verified."
);
pub static ref BEACON_PROCESSOR_RPC_BLOB_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter( pub static ref BEACON_PROCESSOR_RPC_BLOB_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_rpc_blob_imported_total", "beacon_processor_rpc_blob_imported_total",
"Total number of gossip blobs imported." "Total number of gossip blobs imported."

View File

@ -5,7 +5,8 @@ use crate::{
sync::SyncMessage, sync::SyncMessage,
}; };
use beacon_chain::blob_verification::{AsBlock, BlobError, GossipVerifiedBlob}; use beacon_chain::blob_verification::{BlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::store::Error; use beacon_chain::store::Error;
use beacon_chain::{ use beacon_chain::{
attestation_verification::{self, Error as AttnError, VerifiedAttestation}, attestation_verification::{self, Error as AttnError, VerifiedAttestation},
@ -600,7 +601,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// TODO: docs // TODO: docs
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn process_gossip_blob( pub async fn process_gossip_blob(
self, self: &Arc<Self>,
message_id: MessageId, message_id: MessageId,
peer_id: PeerId, peer_id: PeerId,
_peer_client: Client, _peer_client: Client,
@ -699,7 +700,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
} }
pub async fn process_gossip_verified_blob( pub async fn process_gossip_verified_blob(
self, self: &Arc<Self>,
peer_id: PeerId, peer_id: PeerId,
verified_blob: GossipVerifiedBlob<T>, verified_blob: GossipVerifiedBlob<T>,
// This value is not used presently, but it might come in handy for debugging. // This value is not used presently, but it might come in handy for debugging.

View File

@ -2,6 +2,7 @@ use crate::{
service::NetworkMessage, service::NetworkMessage,
sync::{manager::BlockProcessType, SyncMessage}, sync::{manager::BlockProcessType, SyncMessage},
}; };
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{ use beacon_chain::{
builder::Witness, eth1_chain::CachingEth1Backend, test_utils::BeaconChainHarness, BeaconChain, builder::Witness, eth1_chain::CachingEth1Backend, test_utils::BeaconChainHarness, BeaconChain,
}; };
@ -25,9 +26,11 @@ use store::MemoryStore;
use task_executor::test_utils::TestRuntime; use task_executor::test_utils::TestRuntime;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, error::TrySendError}; use tokio::sync::mpsc::{self, error::TrySendError};
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
use types::*; use types::*;
pub use sync_methods::ChainSegmentProcessId; pub use sync_methods::ChainSegmentProcessId;
use types::blob_sidecar::FixedBlobSidecarList;
pub type Error<T> = TrySendError<BeaconWorkEvent<T>>; pub type Error<T> = TrySendError<BeaconWorkEvent<T>>;
@ -196,6 +199,40 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}) })
} }
/// Create a new `Work` event for some blob sidecar.
pub fn send_gossip_blob_sidecar(
self: &Arc<Self>,
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
blob_index: u64,
blob: SignedBlobSidecar<T::EthSpec>,
seen_timestamp: Duration,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
processor
.process_gossip_blob(
message_id,
peer_id,
peer_client,
blob_index,
blob,
seen_timestamp,
)
.await
};
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::GossipSignedBlobSidecar(Box::pin(process_fn)),
})
}
pub fn send_banana(){
}
/// Create a new `Work` event for some sync committee signature. /// Create a new `Work` event for some sync committee signature.
pub fn send_gossip_sync_signature( pub fn send_gossip_sync_signature(
self: &Arc<Self>, self: &Arc<Self>,
@ -376,7 +413,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn send_rpc_beacon_block( pub fn send_rpc_beacon_block(
self: &Arc<Self>, self: &Arc<Self>,
block_root: Hash256, block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>, block: RpcBlock<T::EthSpec>,
seen_timestamp: Duration, seen_timestamp: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
) -> Result<(), Error<T::EthSpec>> { ) -> Result<(), Error<T::EthSpec>> {
@ -392,11 +429,32 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}) })
} }
/// Create a new `Work` event for some block, where the result from computation (if any) is
/// sent to the other side of `result_tx`.
pub fn send_rpc_blobs(
self: &Arc<Self>,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> Result<(), Error<T::EthSpec>> {
let process_fn = self.clone().generate_rpc_blobs_process_fn(
block_root,
blobs,
seen_timestamp,
process_type,
);
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::RpcBlobs { process_fn },
})
}
/// Create a new work event to import `blocks` as a beacon chain segment. /// Create a new work event to import `blocks` as a beacon chain segment.
pub fn send_chain_segment( pub fn send_chain_segment(
self: &Arc<Self>, self: &Arc<Self>,
process_id: ChainSegmentProcessId, process_id: ChainSegmentProcessId,
blocks: Vec<Arc<SignedBeaconBlock<T::EthSpec>>>, blocks: Vec<RpcBlock<T::EthSpec>>,
) -> Result<(), Error<T::EthSpec>> { ) -> Result<(), Error<T::EthSpec>> {
let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. }); let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. });
let processor = self.clone(); let processor = self.clone();
@ -496,6 +554,52 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}) })
} }
/// Create a new work event to process `BlobsByRangeRequest`s from the RPC network.
pub fn send_blobs_by_range_request(
self: &Arc<Self>,
peer_id: PeerId,
request_id: PeerRequestId,
request: BlobsByRangeRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move |send_idle_on_drop| {
processor.handle_blobs_by_range_request(
send_idle_on_drop,
peer_id,
request_id,
request,
)
};
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::BlobsByRangeRequest(Box::new(process_fn)),
})
}
/// Create a new work event to process `BlobsByRootRequest`s from the RPC network.
pub fn send_blobs_by_roots_request(
self: &Arc<Self>,
peer_id: PeerId,
request_id: PeerRequestId,
request: BlobsByRootRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move |send_idle_on_drop| {
processor.handle_blobs_by_root_request(
send_idle_on_drop,
peer_id,
request_id,
request,
)
};
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::BlobsByRootsRequest(Box::new(process_fn)),
})
}
/// Create a new work event to process `LightClientBootstrap`s from the RPC network. /// Create a new work event to process `LightClientBootstrap`s from the RPC network.
pub fn send_lightclient_bootstrap_request( pub fn send_lightclient_bootstrap_request(
self: &Arc<Self>, self: &Arc<Self>,

View File

@ -13,8 +13,8 @@ use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
use slog::{debug, error, trace, warn}; use slog::{debug, error, trace, warn};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc;
use std::collections::{hash_map::Entry, HashMap}; use std::collections::{hash_map::Entry, HashMap};
use std::sync::Arc;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use types::blob_sidecar::BlobIdentifier; use types::blob_sidecar::BlobIdentifier;
@ -217,7 +217,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
} }
/// Handle a `BlobsByRoot` request from the peer. /// Handle a `BlobsByRoot` request from the peer.
pub fn handle_blobs_by_root_request( pub fn handle_blobs_by_root_request(
self, self: Arc<Self>,
send_on_drop: SendOnDrop, send_on_drop: SendOnDrop,
peer_id: PeerId, peer_id: PeerId,
request_id: PeerRequestId, request_id: PeerRequestId,
@ -616,7 +616,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Handle a `BlobsByRange` request from the peer. /// Handle a `BlobsByRange` request from the peer.
pub fn handle_blobs_by_range_request( pub fn handle_blobs_by_range_request(
self, self: Arc<Self>,
send_on_drop: SendOnDrop, send_on_drop: SendOnDrop,
peer_id: PeerId, peer_id: PeerId,
request_id: PeerRequestId, request_id: PeerRequestId,

View File

@ -1,16 +1,18 @@
use std::time::Duration;
use crate::metrics; use crate::metrics;
use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE}; use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE};
use crate::sync::manager::ResponseType;
use crate::sync::BatchProcessResult; use crate::sync::BatchProcessResult;
use crate::sync::{ use crate::sync::{
manager::{BlockProcessType, SyncMessage}, manager::{BlockProcessType, SyncMessage},
ChainId, ChainId,
}; };
use beacon_chain::data_availability_checker::MaybeAvailableBlock;
use beacon_chain::block_verification_types::{AsBlock, RpcBlock};
use beacon_chain::data_availability_checker::AvailabilityCheckError;
use beacon_chain::{ use beacon_chain::{
observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms, observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms,
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError,
NotifyExecutionLayer, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer,
}; };
use beacon_processor::{ use beacon_processor::{
work_reprocessing_queue::{QueuedRpcBlock, ReprocessQueueMessage}, work_reprocessing_queue::{QueuedRpcBlock, ReprocessQueueMessage},
@ -19,6 +21,8 @@ use beacon_processor::{
use lighthouse_network::PeerAction; use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn}; use slog::{debug, error, info, warn};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList; use types::blob_sidecar::FixedBlobSidecarList;
@ -44,14 +48,14 @@ struct ChainSegmentFailed {
} }
impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> { impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Returns an async closure which processes a beacon block recieved via RPC. /// Returns an async closure which processes a beacon block received via RPC.
/// ///
/// This separate function was required to prevent a cycle during compiler /// This separate function was required to prevent a cycle during compiler
/// type checking. /// type checking.
pub fn generate_rpc_beacon_block_process_fn( pub fn generate_rpc_beacon_block_process_fn(
self: Arc<Self>, self: Arc<Self>,
block_root: Hash256, block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>, block: RpcBlock<T::EthSpec>,
seen_timestamp: Duration, seen_timestamp: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
) -> AsyncFn { ) -> AsyncFn {
@ -75,7 +79,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn generate_rpc_beacon_block_fns( pub fn generate_rpc_beacon_block_fns(
self: Arc<Self>, self: Arc<Self>,
block_root: Hash256, block_root: Hash256,
block: BlockWrapper<T::EthSpec>, block: RpcBlock<T::EthSpec>,
seen_timestamp: Duration, seen_timestamp: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
) -> (AsyncFn, BlockingFn) { ) -> (AsyncFn, BlockingFn) {
@ -103,7 +107,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub async fn process_rpc_block( pub async fn process_rpc_block(
self: Arc<NetworkBeaconProcessor<T>>, self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256, block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>, block: RpcBlock<T::EthSpec>,
seen_timestamp: Duration, seen_timestamp: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage>, reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
@ -254,8 +258,26 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
drop(handle); drop(handle);
} }
/// Returns an async closure which processes a list of blobs received via RPC.
///
/// This separate function was required to prevent a cycle during compiler
/// type checking.
pub fn generate_rpc_blobs_process_fn(
self: Arc<Self>,
block_root: Hash256,
block: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> AsyncFn {
let process_fn = async move {
self.clone().process_rpc_blobs(block_root, block, seen_timestamp, process_type)
.await;
};
Box::pin(process_fn)
}
pub async fn process_rpc_blobs( pub async fn process_rpc_blobs(
self, self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256, block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>, blobs: FixedBlobSidecarList<T::EthSpec>,
_seen_timestamp: Duration, _seen_timestamp: Duration,
@ -284,12 +306,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}); });
} }
pub fn send_delayed_lookup(&self, block_root: Hash256){
self.send_sync_message(SyncMessage::MissingGossipBlockComponentsDelayed(block_root))
}
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it. /// thread if more blocks are needed to process it.
pub async fn process_chain_segment( pub async fn process_chain_segment(
&self, &self,
sync_type: ChainSegmentProcessId, sync_type: ChainSegmentProcessId,
downloaded_blocks: Vec<BlockWrapper<T::EthSpec>>, downloaded_blocks: Vec<RpcBlock<T::EthSpec>>,
notify_execution_layer: NotifyExecutionLayer, notify_execution_layer: NotifyExecutionLayer,
) { ) {
let result = match sync_type { let result = match sync_type {
@ -414,7 +440,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Helper function to process blocks batches which only consumes the chain and blocks to process. /// Helper function to process blocks batches which only consumes the chain and blocks to process.
async fn process_blocks<'a>( async fn process_blocks<'a>(
&self, &self,
downloaded_blocks: impl Iterator<Item = &'a BlockWrapper<T::EthSpec>>, downloaded_blocks: impl Iterator<Item = &'a RpcBlock<T::EthSpec>>,
notify_execution_layer: NotifyExecutionLayer, notify_execution_layer: NotifyExecutionLayer,
) -> (usize, Result<(), ChainSegmentFailed>) { ) -> (usize, Result<(), ChainSegmentFailed>) {
let blocks: Vec<_> = downloaded_blocks.cloned().collect(); let blocks: Vec<_> = downloaded_blocks.cloned().collect();
@ -447,7 +473,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Helper function to process backfill block batches which only consumes the chain and blocks to process. /// Helper function to process backfill block batches which only consumes the chain and blocks to process.
fn process_backfill_blocks( fn process_backfill_blocks(
&self, &self,
downloaded_blocks: Vec<BlockWrapper<T::EthSpec>>, downloaded_blocks: Vec<RpcBlock<T::EthSpec>>,
) -> (usize, Result<(), ChainSegmentFailed>) { ) -> (usize, Result<(), ChainSegmentFailed>) {
let total_blocks = downloaded_blocks.len(); let total_blocks = downloaded_blocks.len();
let available_blocks = match downloaded_blocks let available_blocks = match downloaded_blocks
@ -455,7 +481,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.map(|block| { .map(|block| {
self.chain self.chain
.data_availability_checker .data_availability_checker
.check_availability(block) .check_rpc_block_availability(block)
}) })
.collect::<Result<Vec<_>, _>>() .collect::<Result<Vec<_>, _>>()
{ {

View File

@ -10,13 +10,16 @@ use crate::{
use beacon_chain::test_utils::{ use beacon_chain::test_utils::{
test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
}; };
use beacon_chain::{BeaconChain, ChainConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; use beacon_chain::{BeaconChain, ChainConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
use beacon_processor::{work_reprocessing_queue::*, *}; use beacon_processor::{work_reprocessing_queue::*, *};
use lighthouse_network::discovery::ConnectionId;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::rpc::SubstreamId;
use lighthouse_network::{ use lighthouse_network::{
discv5::enr::{CombinedKey, EnrBuilder}, discv5::enr::{CombinedKey, EnrBuilder},
rpc::methods::{MetaData, MetaDataV2}, rpc::methods::{MetaData, MetaDataV2},
types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield}, types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield},
Client, MessageId, NetworkGlobals, PeerId, Client, MessageId, NetworkGlobals, PeerId, Response,
}; };
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::cmp; use std::cmp;
@ -24,9 +27,11 @@ use std::iter::Iterator;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{ use types::{
Attestation, AttesterSlashing, Epoch, EthSpec, Hash256, MainnetEthSpec, ProposerSlashing, Attestation, AttesterSlashing, Epoch, Hash256, MainnetEthSpec, ProposerSlashing,
SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobSidecarList, SignedVoluntaryExit, Slot,
SubnetId,
}; };
type E = MainnetEthSpec; type E = MainnetEthSpec;
@ -275,15 +280,15 @@ impl TestRig {
pub fn enqueue_gossip_blob(&self, blob_index: usize) { pub fn enqueue_gossip_blob(&self, blob_index: usize) {
if let Some(blobs) = self.next_blobs.as_ref() { if let Some(blobs) = self.next_blobs.as_ref() {
let blob = blobs.get(blob_index).unwrap(); let blob = blobs.get(blob_index).unwrap();
self.beacon_processor_tx self.network_beacon_processor
.try_send(WorkEvent::gossip_signed_blob_sidecar( .send_gossip_blob_sidecar(
junk_message_id(), junk_message_id(),
junk_peer_id(), junk_peer_id(),
Client::default(), Client::default(),
blob_index as u64, blob.message.index,
blob.clone(), blob.clone(),
Duration::from_secs(0), Duration::from_secs(0),
)) )
.unwrap(); .unwrap();
} }
} }
@ -319,26 +324,26 @@ impl TestRig {
.map(|b| Some(b.message)) .map(|b| Some(b.message))
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
); );
let event = WorkEvent::rpc_blobs( self.network_beacon_processor
.send_rpc_blobs(
self.next_block.canonical_root(), self.next_block.canonical_root(),
blobs, blobs,
std::time::Duration::default(), std::time::Duration::default(),
BlockProcessType::SingleBlock { id: 1 }, BlockProcessType::SingleBlock { id: 1 },
); )
self.beacon_processor_tx.try_send(event).unwrap(); .unwrap();
} }
} }
pub fn enqueue_blobs_by_range_request(&self, count: u64) { pub fn enqueue_blobs_by_range_request(&self, count: u64) {
let event = WorkEvent::blobs_by_range_request( self.network_beacon_processor.send_blobs_by_range_request(
PeerId::random(), PeerId::random(),
(ConnectionId::new(42), SubstreamId::new(24)), (ConnectionId::new(42), SubstreamId::new(24)),
BlobsByRangeRequest { BlobsByRangeRequest {
start_slot: 0, start_slot: 0,
count, count,
}, },
); ).unwrap();
self.beacon_processor_tx.try_send(event).unwrap();
} }
pub fn enqueue_backfill_batch(&self) { pub fn enqueue_backfill_batch(&self) {
@ -733,7 +738,7 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
events.push(RPC_BLOCK); events.push(RPC_BLOCK);
if num_blobs > 0 { if num_blobs > 0 {
rig.enqueue_single_lookup_rpc_blobs(); rig.enqueue_single_lookup_rpc_blobs();
events.push(RPC_BLOB); events.push(RPC_BLOBS);
} }
} }
}; };
@ -816,7 +821,7 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
events.push(RPC_BLOCK); events.push(RPC_BLOCK);
if num_blobs > 0 { if num_blobs > 0 {
rig.enqueue_single_lookup_rpc_blobs(); rig.enqueue_single_lookup_rpc_blobs();
events.push(RPC_BLOB); events.push(RPC_BLOBS);
} }
} }
}; };
@ -996,7 +1001,7 @@ async fn test_rpc_block_reprocessing() {
rig.enqueue_single_lookup_rpc_blobs(); rig.enqueue_single_lookup_rpc_blobs();
if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 { if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 {
rig.assert_event_journal(&[RPC_BLOB, WORKER_FREED, NOTHING_TO_DO]) rig.assert_event_journal(&[RPC_BLOBS, WORKER_FREED, NOTHING_TO_DO])
.await; .await;
} }

View File

@ -208,11 +208,19 @@ impl<T: BeaconChainTypes> Router<T> {
self.network_beacon_processor self.network_beacon_processor
.send_blocks_by_roots_request(peer_id, request_id, request), .send_blocks_by_roots_request(peer_id, request_id, request),
), ),
Request::BlobsByRange(request) => self.send_beacon_processor_work( Request::BlobsByRange(request) => self.handle_beacon_processor_send_result(
BeaconWorkEvent::blobs_by_range_request(peer_id, request_id, request), self.network_beacon_processor.send_blobs_by_range_request(
peer_id,
request_id,
request,
),
),
Request::BlobsByRoot(request) => self.handle_beacon_processor_send_result(
self.network_beacon_processor.send_blobs_by_roots_request(
peer_id,
request_id,
request,
), ),
Request::BlobsByRoot(request) => self.send_beacon_processor_work(
BeaconWorkEvent::blobs_by_root_request(peer_id, request_id, request),
), ),
Request::LightClientBootstrap(request) => self.handle_beacon_processor_send_result( Request::LightClientBootstrap(request) => self.handle_beacon_processor_send_result(
self.network_beacon_processor self.network_beacon_processor
@ -291,19 +299,20 @@ impl<T: BeaconChainTypes> Router<T> {
self.network_globals.client(&peer_id), self.network_globals.client(&peer_id),
block, block,
timestamp_now(), timestamp_now(),
)) ),
} ),
PubsubMessage::BlobSidecar(data) => { PubsubMessage::BlobSidecar(data) => {
let (blob_index, signed_blob) = *data; let (blob_index, signed_blob) = *data;
let peer_client = self.network_globals.client(&peer_id); self.handle_beacon_processor_send_result(
self.send_beacon_processor_work(BeaconWorkEvent::gossip_signed_blob_sidecar( self.network_beacon_processor.send_gossip_blob_sidecar(
message_id, message_id,
peer_id, peer_id,
peer_client, self.network_globals.client(&peer_id),
blob_index, blob_index,
signed_blob, signed_blob,
timestamp_now(), timestamp_now(),
)) )
)
} }
PubsubMessage::VoluntaryExit(exit) => { PubsubMessage::VoluntaryExit(exit) => {
debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id); debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id);

View File

@ -3,7 +3,6 @@
mod tests { mod tests {
use crate::persisted_dht::load_dht; use crate::persisted_dht::load_dht;
use crate::{NetworkConfig, NetworkService}; use crate::{NetworkConfig, NetworkService};
use beacon_chain::test_utils::BeaconChainHarness;
use beacon_processor::{ use beacon_processor::{
BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN,
}; };
@ -13,8 +12,7 @@ mod tests {
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use tokio::{runtime::Runtime, sync::mpsc}; use tokio::{runtime::Runtime, sync::mpsc};
use types::MinimalEthSpec; use beacon_chain::test_utils::EphemeralHarnessType;
use tokio::runtime::Runtime;
use types::MinimalEthSpec as E; use types::MinimalEthSpec as E;
type BeaconChainHarness = beacon_chain::test_utils::BeaconChainHarness<EphemeralHarnessType<E>>; type BeaconChainHarness = beacon_chain::test_utils::BeaconChainHarness<EphemeralHarnessType<E>>;

View File

@ -1,12 +1,12 @@
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use slog::{crit, warn}; use slog::{crit, };
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::interval_at; use tokio::time::interval_at;
use tokio::time::Instant; use tokio::time::Instant;
use types::Hash256; use types::{ Hash256};
use crate::network_beacon_processor::NetworkBeaconProcessor;
#[derive(Debug)] #[derive(Debug)]
pub enum DelayedLookupMessage { pub enum DelayedLookupMessage {
@ -35,7 +35,7 @@ pub fn spawn_delayed_lookup_service<T: BeaconChainTypes>(
executor: &task_executor::TaskExecutor, executor: &task_executor::TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
mut delayed_lookups_recv: mpsc::Receiver<DelayedLookupMessage>, mut delayed_lookups_recv: mpsc::Receiver<DelayedLookupMessage>,
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>, beacon_processor: Arc<NetworkBeaconProcessor<T>>,
log: slog::Logger, log: slog::Logger,
) { ) {
executor.spawn( executor.spawn(
@ -52,7 +52,7 @@ pub fn spawn_delayed_lookup_service<T: BeaconChainTypes>(
} else { } else {
delay - seconds_from_current_slot_start delay - seconds_from_current_slot_start
}; };
tokio::time::Instant::now() + duration_until_start Instant::now() + duration_until_start
} }
_ => { _ => {
crit!(log, crit!(log,
@ -69,11 +69,8 @@ pub fn spawn_delayed_lookup_service<T: BeaconChainTypes>(
while let Ok(msg) = delayed_lookups_recv.try_recv() { while let Ok(msg) = delayed_lookups_recv.try_recv() {
match msg { match msg {
DelayedLookupMessage::MissingComponents(block_root) => { DelayedLookupMessage::MissingComponents(block_root) => {
if let Err(e) = sync_send beacon_processor
.send(SyncMessage::MissingGossipBlockComponentsDelayed(block_root)) .send_delayed_lookup(block_root)
{
warn!(log, "Failed to send delayed lookup message"; "error" => ?e);
}
} }
} }
} }

View File

@ -1,21 +1,3 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::time::Duration;
use crate::network_beacon_processor::ChainSegmentProcessId;
use beacon_chain::{BeaconChainTypes, BlockError};
use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
use slog::{debug, error, trace, warn, Logger};
use smallvec::SmallVec;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use store::{Hash256, SignedBeaconBlock};
use crate::metrics;
use self::parent_lookup::PARENT_FAIL_TOLERANCE; use self::parent_lookup::PARENT_FAIL_TOLERANCE;
use self::parent_lookup::{ParentLookup, ParentVerifyError}; use self::parent_lookup::{ParentLookup, ParentVerifyError};
use self::single_block_lookup::{LookupVerifyError, SingleBlockLookup}; use self::single_block_lookup::{LookupVerifyError, SingleBlockLookup};
@ -25,10 +7,26 @@ use super::{
manager::{BlockProcessType, Id}, manager::{BlockProcessType, Id},
network_context::SyncNetworkContext, network_context::SyncNetworkContext,
}; };
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
use crate::metrics; use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::block_lookups::single_block_lookup::LookupId; use crate::sync::block_lookups::single_block_lookup::LookupId;
use beacon_chain::block_verification_types::{AsBlock, RpcBlock};
use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
use lighthouse_network::rpc::RPCError;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
pub use single_block_lookup::UnknownParentComponents; pub use single_block_lookup::UnknownParentComponents;
use slog::{debug, error, trace, warn, Logger};
use smallvec::SmallVec;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use store::{Hash256, SignedBeaconBlock};
use strum::Display;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, Slot};
pub(crate) mod delayed_lookup; pub(crate) mod delayed_lookup;
mod parent_lookup; mod parent_lookup;
@ -36,7 +34,7 @@ mod single_block_lookup;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
pub type DownloadedBlocks<T> = (Hash256, BlockWrapper<T>); pub type DownloadedBlocks<T> = (Hash256, RpcBlock<T>);
pub type RootBlockTuple<T> = (Hash256, Arc<SignedBeaconBlock<T>>); pub type RootBlockTuple<T> = (Hash256, Arc<SignedBeaconBlock<T>>);
pub type RootBlobsTuple<T> = (Hash256, FixedBlobSidecarList<T>); pub type RootBlobsTuple<T> = (Hash256, FixedBlobSidecarList<T>);
@ -381,13 +379,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}; };
if !has_pending_parent_request { if !has_pending_parent_request {
let block_wrapper = request_ref let rpc_block = request_ref
.get_downloaded_block() .get_downloaded_block()
.unwrap_or(BlockWrapper::Block(block)); .unwrap_or(RpcBlock::new_without_blobs(block));
// This is the correct block, send it for processing // This is the correct block, send it for processing
match self.send_block_for_processing( match self.send_block_for_processing(
block_root, block_root,
block_wrapper, rpc_block,
seen_timestamp, seen_timestamp,
BlockProcessType::SingleBlock { id }, BlockProcessType::SingleBlock { id },
cx, cx,
@ -563,14 +561,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
match parent_lookup.verify_block(block, &mut self.failed_chains) { match parent_lookup.verify_block(block, &mut self.failed_chains) {
Ok(Some((block_root, block))) => { Ok(Some((block_root, block))) => {
parent_lookup.add_current_request_block(block); parent_lookup.add_current_request_block(block);
if let Some(block_wrapper) = if let Some(rpc_block) = parent_lookup.current_parent_request.get_downloaded_block()
parent_lookup.current_parent_request.get_downloaded_block()
{ {
let chain_hash = parent_lookup.chain_hash(); let chain_hash = parent_lookup.chain_hash();
if self if self
.send_block_for_processing( .send_block_for_processing(
block_root, block_root,
block_wrapper, rpc_block,
seen_timestamp, seen_timestamp,
BlockProcessType::ParentLookup { chain_hash }, BlockProcessType::ParentLookup { chain_hash },
cx, cx,
@ -644,13 +641,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Ok(Some((block_root, blobs))) => { Ok(Some((block_root, blobs))) => {
parent_lookup.add_current_request_blobs(blobs); parent_lookup.add_current_request_blobs(blobs);
let chain_hash = parent_lookup.chain_hash(); let chain_hash = parent_lookup.chain_hash();
if let Some(block_wrapper) = if let Some(rpc_block) = parent_lookup.current_parent_request.get_downloaded_block()
parent_lookup.current_parent_request.get_downloaded_block()
{ {
if self if self
.send_block_for_processing( .send_block_for_processing(
block_root, block_root,
block_wrapper, rpc_block,
seen_timestamp, seen_timestamp,
BlockProcessType::ParentLookup { chain_hash }, BlockProcessType::ParentLookup { chain_hash },
cx, cx,
@ -914,11 +910,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockError::ParentUnknown(block) => { BlockError::ParentUnknown(block) => {
let slot = block.slot(); let slot = block.slot();
let parent_root = block.parent_root(); let parent_root = block.parent_root();
let (block, blobs) = block.deconstruct(); request_ref.add_unknown_parent_components(block.into());
request_ref.add_unknown_parent_block(block);
if let Some(blobs) = blobs {
request_ref.add_unknown_parent_blobs(blobs);
}
self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx); self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx);
ShouldRemoveLookup::False ShouldRemoveLookup::False
} }
@ -1077,7 +1069,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
blocks.push(child_block); blocks.push(child_block);
}; };
let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); let process_id = ChainSegmentProcessId::ParentLookup(chain_hash);
let work = WorkEvent::chain_segment(process_id, blocks);
match beacon_processor.send_chain_segment(process_id, blocks) { match beacon_processor.send_chain_segment(process_id, blocks) {
Ok(_) => { Ok(_) => {
@ -1171,7 +1162,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.enumerate() .enumerate()
.find(|(_, req)| req.block_request_state.requested_block_root == chain_hash) .find(|(_, req)| req.block_request_state.requested_block_root == chain_hash)
{ {
if let Some((lookup_id, block_wrapper)) = if let Some((lookup_id, rpc_block)) =
self.single_block_lookups.get_mut(index).and_then(|lookup| { self.single_block_lookups.get_mut(index).and_then(|lookup| {
lookup lookup
.get_downloaded_block() .get_downloaded_block()
@ -1191,7 +1182,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if self if self
.send_block_for_processing( .send_block_for_processing(
chain_hash, chain_hash,
block_wrapper, rpc_block,
Duration::from_secs(0), //TODO(sean) pipe this through Duration::from_secs(0), //TODO(sean) pipe this through
BlockProcessType::SingleBlock { id }, BlockProcessType::SingleBlock { id },
cx, cx,
@ -1231,7 +1222,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
fn send_block_for_processing( fn send_block_for_processing(
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
block: BlockWrapper<T::EthSpec>, block: RpcBlock<T::EthSpec>,
duration: Duration, duration: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
@ -1274,11 +1265,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if blob_count == 0 { if blob_count == 0 {
return Ok(()); return Ok(());
} }
match cx.processor_channel_if_enabled() { match cx.beacon_processor_if_enabled() {
Some(beacon_processor_send) => { Some(beacon_processor) => {
trace!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process_type" => ?process_type); trace!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process_type" => ?process_type);
let event = WorkEvent::rpc_blobs(block_root, blobs, duration, process_type); if let Err(e) =
if let Err(e) = beacon_processor_send.try_send(event) { beacon_processor.send_rpc_blobs(block_root, blobs, duration, process_type)
{
error!( error!(
self.log, self.log,
"Failed to send sync blobs to processor"; "Failed to send sync blobs to processor";

View File

@ -1,19 +1,20 @@
#![cfg(feature = "spec-minimal")] #![cfg(feature = "spec-minimal")]
use std::sync::Arc;
use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::service::RequestId; use crate::service::RequestId;
use crate::sync::manager::RequestId as SyncId; use crate::sync::manager::RequestId as SyncId;
use crate::NetworkMessage; use crate::NetworkMessage;
use std::sync::Arc;
use super::*; use super::*;
use beacon_chain::builder::Witness; use beacon_chain::builder::Witness;
use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::eth1_chain::CachingEth1Backend;
use beacon_chain::test_utils::{build_log, BeaconChainHarness, EphemeralHarnessType};
use beacon_processor::WorkEvent; use beacon_processor::WorkEvent;
use execution_layer::BlobsBundleV1;
use lighthouse_network::rpc::RPCResponseErrorCode;
use lighthouse_network::{NetworkGlobals, Request}; use lighthouse_network::{NetworkGlobals, Request};
use slog::{Drain, Level}; use slot_clock::{ManualSlotClock, SlotClock, TestingSlotClock};
use slot_clock::ManualSlotClock;
use store::MemoryStore; use store::MemoryStore;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::{ use types::{
@ -212,7 +213,7 @@ impl TestRig {
}, },
ResponseType::Blob => match self.beacon_processor_rx.try_recv() { ResponseType::Blob => match self.beacon_processor_rx.try_recv() {
Ok(work) => { Ok(work) => {
assert_eq!(work.work_type(), beacon_processor::RPC_BLOB); assert_eq!(work.work_type(), beacon_processor::RPC_BLOBS);
} }
other => panic!("Expected blob process, found {:?}", other), other => panic!("Expected blob process, found {:?}", other),
}, },

View File

@ -127,7 +127,7 @@ pub enum SyncMessage<T: EthSpec> {
}, },
/// A block with an unknown parent has been received. /// A block with an unknown parent has been received.
UnknownParentBlock(PeerId, BlockWrapper<T>, Hash256), UnknownParentBlock(PeerId, RpcBlock<T>, Hash256),
/// A blob with an unknown parent has been received. /// A blob with an unknown parent has been received.
UnknownParentBlob(PeerId, Arc<BlobSidecar<T>>), UnknownParentBlob(PeerId, Arc<BlobSidecar<T>>),
@ -242,13 +242,20 @@ pub fn spawn<T: BeaconChainTypes>(
MAX_REQUEST_BLOCKS >= T::EthSpec::slots_per_epoch() * EPOCHS_PER_BATCH, MAX_REQUEST_BLOCKS >= T::EthSpec::slots_per_epoch() * EPOCHS_PER_BATCH,
"Max blocks that can be requested in a single batch greater than max allowed blocks in a single request" "Max blocks that can be requested in a single batch greater than max allowed blocks in a single request"
); );
let (delayed_lookups_send, delayed_lookups_recv) =
mpsc::channel::<DelayedLookupMessage>(DELAY_QUEUE_CHANNEL_SIZE);
// create an instance of the SyncManager // create an instance of the SyncManager
let network_globals = beacon_processor.network_globals.clone(); let network_globals = beacon_processor.network_globals.clone();
let mut sync_manager = SyncManager { let mut sync_manager = SyncManager {
chain: beacon_chain.clone(), chain: beacon_chain.clone(),
input_channel: sync_recv, input_channel: sync_recv,
network: SyncNetworkContext::new(network_send, beacon_processor, log.clone()), network: SyncNetworkContext::new(
network_send,
beacon_processor.clone(),
beacon_chain.clone(),
log.clone(),
),
range_sync: RangeSync::new(beacon_chain.clone(), log.clone()), range_sync: RangeSync::new(beacon_chain.clone(), log.clone()),
backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()), backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()),
block_lookups: BlockLookups::new( block_lookups: BlockLookups::new(
@ -260,12 +267,11 @@ pub fn spawn<T: BeaconChainTypes>(
}; };
let log_clone = log.clone(); let log_clone = log.clone();
let sync_send_clone = sync_send.clone();
delayed_lookup::spawn_delayed_lookup_service( delayed_lookup::spawn_delayed_lookup_service(
&executor, &executor,
beacon_chain, beacon_chain,
delayed_lookups_recv, delayed_lookups_recv,
sync_send, beacon_processor,
log, log,
); );
@ -792,7 +798,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
fn should_search_for_block(&mut self, block_slot: Slot, peer_id: &PeerId) -> bool { fn should_search_for_block(&mut self, block_slot: Slot, peer_id: &PeerId) -> bool {
if !self.network_globals.sync_state.read().is_synced() { if !self.network_globals().sync_state.read().is_synced() {
let head_slot = self.chain.canonical_head.cached_head().head_slot(); let head_slot = self.chain.canonical_head.cached_head().head_slot();
// if the block is far in the future, ignore it. If its within the slot tolerance of // if the block is far in the future, ignore it. If its within the slot tolerance of
@ -806,13 +812,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
} }
self.network_globals.peers.read().is_connected(peer_id) self.network_globals().peers.read().is_connected(peer_id)
&& self.network.is_execution_engine_online() && self.network.is_execution_engine_online()
} }
fn synced_and_connected(&mut self, peer_id: &PeerId) -> bool { fn synced_and_connected(&mut self, peer_id: &PeerId) -> bool {
self.network_globals.sync_state.read().is_synced() self.network_globals().sync_state.read().is_synced()
&& self.network_globals.peers.read().is_connected(peer_id) && self.network_globals().peers.read().is_connected(peer_id)
&& self.network.is_execution_engine_online() && self.network.is_execution_engine_online()
} }

View File

@ -4,8 +4,7 @@
use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo;
use super::manager::{Id, RequestId as SyncRequestId}; use super::manager::{Id, RequestId as SyncRequestId};
use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
use crate::beacon_processor::BeaconProcessorSend; use crate::network_beacon_processor::NetworkBeaconProcessor;
duse crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::service::{NetworkMessage, RequestId}; use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use crate::sync::block_lookups::{BlobRequestId, BlockRequestId}; use crate::sync::block_lookups::{BlobRequestId, BlockRequestId};
@ -97,6 +96,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: 1, request_id: 1,
range_requests: FnvHashMap::default(), range_requests: FnvHashMap::default(),
backfill_requests: FnvHashMap::default(), backfill_requests: FnvHashMap::default(),
range_blocks_and_blobs_requests: FnvHashMap::default(),
backfill_blocks_and_blobs_requests: FnvHashMap::default(),
network_beacon_processor, network_beacon_processor,
chain, chain,
log, log,

View File

@ -395,11 +395,11 @@ mod tests {
use slog::{o, Drain}; use slog::{o, Drain};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use slot_clock::ManualSlotClock; use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType};
use slot_clock::{TestingSlotClock, };
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use store::MemoryStore; use store::MemoryStore;
use tokio::sync::mpsc;
use types::{Hash256, MinimalEthSpec as E}; use types::{Hash256, MinimalEthSpec as E};
#[derive(Debug)] #[derive(Debug)]
@ -612,7 +612,6 @@ mod tests {
let chain = harness.chain; let chain = harness.chain;
let fake_store = Arc::new(FakeStorage::default()); let fake_store = Arc::new(FakeStorage::default());
let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(10);
let range_sync = RangeSync::<TestBeaconChainType, FakeStorage>::new( let range_sync = RangeSync::<TestBeaconChainType, FakeStorage>::new(
fake_store.clone(), fake_store.clone(),
log.new(o!("component" => "range")), log.new(o!("component" => "range")),