fix merge

This commit is contained in:
realbigsean 2023-07-14 16:15:28 -04:00
parent 42f54ee561
commit 405e95b0ce
No known key found for this signature in database
GPG Key ID: BE1B3DB104F6C788
10 changed files with 49 additions and 61 deletions

View File

@ -5,8 +5,8 @@ use crate::{
sync::SyncMessage, sync::SyncMessage,
}; };
use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::{BlobError, GossipVerifiedBlob}; use beacon_chain::blob_verification::{BlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::store::Error; use beacon_chain::store::Error;
use beacon_chain::{ use beacon_chain::{
attestation_verification::{self, Error as AttnError, VerifiedAttestation}, attestation_verification::{self, Error as AttnError, VerifiedAttestation},

View File

@ -2,7 +2,7 @@ use crate::{
service::NetworkMessage, service::NetworkMessage,
sync::{manager::BlockProcessType, SyncMessage}, sync::{manager::BlockProcessType, SyncMessage},
}; };
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::{ use beacon_chain::{
builder::Witness, eth1_chain::CachingEth1Backend, test_utils::BeaconChainHarness, BeaconChain, builder::Witness, eth1_chain::CachingEth1Backend, test_utils::BeaconChainHarness, BeaconChain,
}; };
@ -13,6 +13,7 @@ use beacon_processor::{
MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN,
}; };
use environment::null_logger; use environment::null_logger;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
use lighthouse_network::{ use lighthouse_network::{
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
@ -26,7 +27,6 @@ use store::MemoryStore;
use task_executor::test_utils::TestRuntime; use task_executor::test_utils::TestRuntime;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, error::TrySendError}; use tokio::sync::mpsc::{self, error::TrySendError};
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
use types::*; use types::*;
pub use sync_methods::ChainSegmentProcessId; pub use sync_methods::ChainSegmentProcessId;
@ -229,9 +229,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}) })
} }
pub fn send_banana(){ pub fn send_banana() {}
}
/// Create a new `Work` event for some sync committee signature. /// Create a new `Work` event for some sync committee signature.
pub fn send_gossip_sync_signature( pub fn send_gossip_sync_signature(
@ -413,7 +411,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn send_rpc_beacon_block( pub fn send_rpc_beacon_block(
self: &Arc<Self>, self: &Arc<Self>,
block_root: Hash256, block_root: Hash256,
block: RpcBlock<T::EthSpec>, block: BlockWrapper<T::EthSpec>,
seen_timestamp: Duration, seen_timestamp: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
) -> Result<(), Error<T::EthSpec>> { ) -> Result<(), Error<T::EthSpec>> {
@ -454,7 +452,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn send_chain_segment( pub fn send_chain_segment(
self: &Arc<Self>, self: &Arc<Self>,
process_id: ChainSegmentProcessId, process_id: ChainSegmentProcessId,
blocks: Vec<RpcBlock<T::EthSpec>>, blocks: Vec<BlockWrapper<T::EthSpec>>,
) -> Result<(), Error<T::EthSpec>> { ) -> Result<(), Error<T::EthSpec>> {
let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. }); let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. });
let processor = self.clone(); let processor = self.clone();
@ -563,12 +561,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) -> Result<(), Error<T::EthSpec>> { ) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone(); let processor = self.clone();
let process_fn = move |send_idle_on_drop| { let process_fn = move |send_idle_on_drop| {
processor.handle_blobs_by_range_request( processor.handle_blobs_by_range_request(send_idle_on_drop, peer_id, request_id, request)
send_idle_on_drop,
peer_id,
request_id,
request,
)
}; };
self.try_send(BeaconWorkEvent { self.try_send(BeaconWorkEvent {
@ -586,12 +579,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) -> Result<(), Error<T::EthSpec>> { ) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone(); let processor = self.clone();
let process_fn = move |send_idle_on_drop| { let process_fn = move |send_idle_on_drop| {
processor.handle_blobs_by_root_request( processor.handle_blobs_by_root_request(send_idle_on_drop, peer_id, request_id, request)
send_idle_on_drop,
peer_id,
request_id,
request,
)
}; };
self.try_send(BeaconWorkEvent { self.try_send(BeaconWorkEvent {

View File

@ -6,8 +6,7 @@ use crate::sync::{
manager::{BlockProcessType, SyncMessage}, manager::{BlockProcessType, SyncMessage},
ChainId, ChainId,
}; };
use beacon_chain::data_availability_checker::MaybeAvailableBlock; use beacon_chain::blob_verification::{AsBlock, BlockWrapper, MaybeAvailableBlock};
use beacon_chain::block_verification_types::{AsBlock, RpcBlock};
use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::AvailabilityCheckError;
use beacon_chain::{ use beacon_chain::{
observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms, observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms,
@ -55,7 +54,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn generate_rpc_beacon_block_process_fn( pub fn generate_rpc_beacon_block_process_fn(
self: Arc<Self>, self: Arc<Self>,
block_root: Hash256, block_root: Hash256,
block: RpcBlock<T::EthSpec>, block: BlockWrapper<T::EthSpec>,
seen_timestamp: Duration, seen_timestamp: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
) -> AsyncFn { ) -> AsyncFn {
@ -79,7 +78,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn generate_rpc_beacon_block_fns( pub fn generate_rpc_beacon_block_fns(
self: Arc<Self>, self: Arc<Self>,
block_root: Hash256, block_root: Hash256,
block: RpcBlock<T::EthSpec>, block: BlockWrapper<T::EthSpec>,
seen_timestamp: Duration, seen_timestamp: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
) -> (AsyncFn, BlockingFn) { ) -> (AsyncFn, BlockingFn) {
@ -107,7 +106,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub async fn process_rpc_block( pub async fn process_rpc_block(
self: Arc<NetworkBeaconProcessor<T>>, self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256, block_root: Hash256,
block: RpcBlock<T::EthSpec>, block: BlockWrapper<T::EthSpec>,
seen_timestamp: Duration, seen_timestamp: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage>, reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
@ -270,7 +269,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
process_type: BlockProcessType, process_type: BlockProcessType,
) -> AsyncFn { ) -> AsyncFn {
let process_fn = async move { let process_fn = async move {
self.clone().process_rpc_blobs(block_root, block, seen_timestamp, process_type) self.clone()
.process_rpc_blobs(block_root, block, seen_timestamp, process_type)
.await; .await;
}; };
Box::pin(process_fn) Box::pin(process_fn)
@ -306,7 +306,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}); });
} }
pub fn send_delayed_lookup(&self, block_root: Hash256){ pub fn send_delayed_lookup(&self, block_root: Hash256) {
self.send_sync_message(SyncMessage::MissingGossipBlockComponentsDelayed(block_root)) self.send_sync_message(SyncMessage::MissingGossipBlockComponentsDelayed(block_root))
} }
@ -315,7 +315,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub async fn process_chain_segment( pub async fn process_chain_segment(
&self, &self,
sync_type: ChainSegmentProcessId, sync_type: ChainSegmentProcessId,
downloaded_blocks: Vec<RpcBlock<T::EthSpec>>, downloaded_blocks: Vec<BlockWrapper<T::EthSpec>>,
notify_execution_layer: NotifyExecutionLayer, notify_execution_layer: NotifyExecutionLayer,
) { ) {
let result = match sync_type { let result = match sync_type {
@ -440,7 +440,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Helper function to process blocks batches which only consumes the chain and blocks to process. /// Helper function to process blocks batches which only consumes the chain and blocks to process.
async fn process_blocks<'a>( async fn process_blocks<'a>(
&self, &self,
downloaded_blocks: impl Iterator<Item = &'a RpcBlock<T::EthSpec>>, downloaded_blocks: impl Iterator<Item = &'a BlockWrapper<T::EthSpec>>,
notify_execution_layer: NotifyExecutionLayer, notify_execution_layer: NotifyExecutionLayer,
) -> (usize, Result<(), ChainSegmentFailed>) { ) -> (usize, Result<(), ChainSegmentFailed>) {
let blocks: Vec<_> = downloaded_blocks.cloned().collect(); let blocks: Vec<_> = downloaded_blocks.cloned().collect();
@ -473,7 +473,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Helper function to process backfill block batches which only consumes the chain and blocks to process. /// Helper function to process backfill block batches which only consumes the chain and blocks to process.
fn process_backfill_blocks( fn process_backfill_blocks(
&self, &self,
downloaded_blocks: Vec<RpcBlock<T::EthSpec>>, downloaded_blocks: Vec<BlockWrapper<T::EthSpec>>,
) -> (usize, Result<(), ChainSegmentFailed>) { ) -> (usize, Result<(), ChainSegmentFailed>) {
let total_blocks = downloaded_blocks.len(); let total_blocks = downloaded_blocks.len();
let available_blocks = match downloaded_blocks let available_blocks = match downloaded_blocks
@ -481,7 +481,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.map(|block| { .map(|block| {
self.chain self.chain
.data_availability_checker .data_availability_checker
.check_rpc_block_availability(block) .check_availability(block)
}) })
.collect::<Result<Vec<_>, _>>() .collect::<Result<Vec<_>, _>>()
{ {

View File

@ -336,14 +336,16 @@ impl TestRig {
} }
pub fn enqueue_blobs_by_range_request(&self, count: u64) { pub fn enqueue_blobs_by_range_request(&self, count: u64) {
self.network_beacon_processor.send_blobs_by_range_request( self.network_beacon_processor
PeerId::random(), .send_blobs_by_range_request(
(ConnectionId::new(42), SubstreamId::new(24)), PeerId::random(),
BlobsByRangeRequest { (ConnectionId::new(42), SubstreamId::new(24)),
start_slot: 0, BlobsByRangeRequest {
count, start_slot: 0,
}, count,
).unwrap(); },
)
.unwrap();
} }
pub fn enqueue_backfill_batch(&self) { pub fn enqueue_backfill_batch(&self) {

View File

@ -209,18 +209,12 @@ impl<T: BeaconChainTypes> Router<T> {
.send_blocks_by_roots_request(peer_id, request_id, request), .send_blocks_by_roots_request(peer_id, request_id, request),
), ),
Request::BlobsByRange(request) => self.handle_beacon_processor_send_result( Request::BlobsByRange(request) => self.handle_beacon_processor_send_result(
self.network_beacon_processor.send_blobs_by_range_request( self.network_beacon_processor
peer_id, .send_blobs_by_range_request(peer_id, request_id, request),
request_id,
request,
),
), ),
Request::BlobsByRoot(request) => self.handle_beacon_processor_send_result( Request::BlobsByRoot(request) => self.handle_beacon_processor_send_result(
self.network_beacon_processor.send_blobs_by_roots_request( self.network_beacon_processor
peer_id, .send_blobs_by_roots_request(peer_id, request_id, request),
request_id,
request,
),
), ),
Request::LightClientBootstrap(request) => self.handle_beacon_processor_send_result( Request::LightClientBootstrap(request) => self.handle_beacon_processor_send_result(
self.network_beacon_processor self.network_beacon_processor
@ -311,7 +305,7 @@ impl<T: BeaconChainTypes> Router<T> {
blob_index, blob_index,
signed_blob, signed_blob,
timestamp_now(), timestamp_now(),
) ),
) )
} }
PubsubMessage::VoluntaryExit(exit) => { PubsubMessage::VoluntaryExit(exit) => {

View File

@ -3,6 +3,7 @@
mod tests { mod tests {
use crate::persisted_dht::load_dht; use crate::persisted_dht::load_dht;
use crate::{NetworkConfig, NetworkService}; use crate::{NetworkConfig, NetworkService};
use beacon_chain::test_utils::EphemeralHarnessType;
use beacon_processor::{ use beacon_processor::{
BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN,
}; };
@ -12,7 +13,6 @@ mod tests {
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use tokio::{runtime::Runtime, sync::mpsc}; use tokio::{runtime::Runtime, sync::mpsc};
use beacon_chain::test_utils::EphemeralHarnessType;
use types::MinimalEthSpec as E; use types::MinimalEthSpec as E;
type BeaconChainHarness = beacon_chain::test_utils::BeaconChainHarness<EphemeralHarnessType<E>>; type BeaconChainHarness = beacon_chain::test_utils::BeaconChainHarness<EphemeralHarnessType<E>>;

View File

@ -1,12 +1,12 @@
use crate::network_beacon_processor::NetworkBeaconProcessor;
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use slog::{crit, }; use slog::crit;
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::interval_at; use tokio::time::interval_at;
use tokio::time::Instant; use tokio::time::Instant;
use types::{ Hash256}; use types::Hash256;
use crate::network_beacon_processor::NetworkBeaconProcessor;
#[derive(Debug)] #[derive(Debug)]
pub enum DelayedLookupMessage { pub enum DelayedLookupMessage {

View File

@ -10,7 +10,7 @@ use super::{
use crate::metrics; use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId; use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::block_lookups::single_block_lookup::LookupId; use crate::sync::block_lookups::single_block_lookup::LookupId;
use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
use lighthouse_network::rpc::RPCError; use lighthouse_network::rpc::RPCError;
@ -34,7 +34,7 @@ mod single_block_lookup;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
pub type DownloadedBlocks<T> = (Hash256, RpcBlock<T>); pub type DownloadedBlocks<T> = (Hash256, BlockWrapper<T>);
pub type RootBlockTuple<T> = (Hash256, Arc<SignedBeaconBlock<T>>); pub type RootBlockTuple<T> = (Hash256, Arc<SignedBeaconBlock<T>>);
pub type RootBlobsTuple<T> = (Hash256, FixedBlobSidecarList<T>); pub type RootBlobsTuple<T> = (Hash256, FixedBlobSidecarList<T>);
@ -381,7 +381,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if !has_pending_parent_request { if !has_pending_parent_request {
let rpc_block = request_ref let rpc_block = request_ref
.get_downloaded_block() .get_downloaded_block()
.unwrap_or(RpcBlock::new_without_blobs(block)); .unwrap_or(BlockWrapper::Block(block));
// This is the correct block, send it for processing // This is the correct block, send it for processing
match self.send_block_for_processing( match self.send_block_for_processing(
block_root, block_root,
@ -910,7 +910,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockError::ParentUnknown(block) => { BlockError::ParentUnknown(block) => {
let slot = block.slot(); let slot = block.slot();
let parent_root = block.parent_root(); let parent_root = block.parent_root();
request_ref.add_unknown_parent_components(block.into()); let (block, blobs) = block.deconstruct();
request_ref.add_unknown_parent_components(UnknownParentComponents::new(
Some(block),
blobs,
));
self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx); self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx);
ShouldRemoveLookup::False ShouldRemoveLookup::False
} }
@ -1222,7 +1226,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
fn send_block_for_processing( fn send_block_for_processing(
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
block: RpcBlock<T::EthSpec>, block: BlockWrapper<T::EthSpec>,
duration: Duration, duration: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,

View File

@ -127,7 +127,7 @@ pub enum SyncMessage<T: EthSpec> {
}, },
/// A block with an unknown parent has been received. /// A block with an unknown parent has been received.
UnknownParentBlock(PeerId, RpcBlock<T>, Hash256), UnknownParentBlock(PeerId, BlockWrapper<T>, Hash256),
/// A blob with an unknown parent has been received. /// A blob with an unknown parent has been received.
UnknownParentBlob(PeerId, Arc<BlobSidecar<T>>), UnknownParentBlob(PeerId, Arc<BlobSidecar<T>>),

View File

@ -396,7 +396,7 @@ mod tests {
use tokio::sync::mpsc; use tokio::sync::mpsc;
use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType};
use slot_clock::{TestingSlotClock, }; use slot_clock::TestingSlotClock;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use store::MemoryStore; use store::MemoryStore;