diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 2a811b805..450efc886 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -45,6 +45,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock}; use derivative::Derivative; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; +use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, @@ -197,6 +198,7 @@ pub const CHAIN_SEGMENT: &str = "chain_segment"; pub const STATUS_PROCESSING: &str = "status_processing"; pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request"; pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; +pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; @@ -577,6 +579,21 @@ impl WorkEvent { } } + pub fn blobs_by_range_request( + peer_id: PeerId, + request_id: PeerRequestId, + request: BlobsByRangeRequest, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::BlobsByRangeRequest { + peer_id, + request_id, + request, + }, + } + } + /// Get a `str` representation of the type of work this `WorkEvent` contains. pub fn work_type(&self) -> &'static str { self.work.str_id() @@ -757,6 +774,11 @@ pub enum Work { request_id: PeerRequestId, request: BlocksByRootRequest, }, + BlobsByRangeRequest { + peer_id: PeerId, + request_id: PeerRequestId, + request: BlobsByRangeRequest, + } } impl Work { @@ -780,6 +802,7 @@ impl Work { Work::Status { .. } => STATUS_PROCESSING, Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST, Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST, + Work::BlobsByRangeRequest {..} => BLOBS_BY_RANGE_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, } @@ -924,6 +947,7 @@ impl BeaconProcessor { let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN); let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); + let mut blbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to // receive them back once they are ready (`ready_work_rx`). @@ -1266,6 +1290,9 @@ impl BeaconProcessor { Work::BlocksByRootsRequest { .. } => { bbroots_queue.push(work, work_id, &self.log) } + Work::BlobsByRangeRequest { .. } => { + blbrange_queue.push(work, work_id, &self.log) + } Work::UnknownBlockAttestation { .. } => { unknown_block_attestation_queue.push(work) } @@ -1643,6 +1670,21 @@ impl BeaconProcessor { request, ) }), + + Work::BlobsByRangeRequest { + peer_id, + request_id, + request + } => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| { + worker.handle_blobs_by_range_request( + sub_executor, + send_idle_on_drop, + peer_id, + request_id, + request, + ) + }), + Work::UnknownBlockAttestation { message_id, peer_id, diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 8ca9c35e4..2b8cbbc22 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -6,6 +6,7 @@ use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, Whe use itertools::process_results; use lighthouse_network::rpc::StatusMessage; use lighthouse_network::rpc::*; +use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error}; use slot_clock::SlotClock; @@ -372,4 +373,16 @@ impl Worker { "load_blocks_by_range_blocks", ); } + + /// Handle a `BlobsByRange` request from the peer. + pub fn handle_blobs_by_range_request( + self, + executor: TaskExecutor, + send_on_drop: SendOnDrop, + peer_id: PeerId, + request_id: PeerRequestId, + request: BlobsByRangeRequest, + ) { + // TODO impl + } } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 2d12cf540..e640bace5 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -168,11 +168,9 @@ impl Processor { request_id: PeerRequestId, request: BlobsByRangeRequest, ) { - /* - self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_roots_request( + self.send_beacon_processor_work(BeaconWorkEvent::blobs_by_range_request( peer_id, request_id, request, )) - */ } /// Handle a `BlocksByRange` request from the peer. pub fn on_blocks_by_range_request(