more network stuff

This commit is contained in:
Marius van der Wijden 2022-09-17 16:39:40 +02:00
parent aeb52ff186
commit f9209e2d08
3 changed files with 56 additions and 3 deletions

View File

@ -45,6 +45,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock};
use derivative::Derivative; use derivative::Derivative;
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use futures::task::Poll; use futures::task::Poll;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::{ use lighthouse_network::{
rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage},
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
@ -197,6 +198,7 @@ pub const CHAIN_SEGMENT: &str = "chain_segment";
pub const STATUS_PROCESSING: &str = "status_processing"; pub const STATUS_PROCESSING: &str = "status_processing";
pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request"; pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request";
pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request";
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
@ -577,6 +579,21 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
} }
} }
pub fn blobs_by_range_request(
peer_id: PeerId,
request_id: PeerRequestId,
request: BlobsByRangeRequest,
) -> Self {
Self {
drop_during_sync: false,
work: Work::BlobsByRangeRequest {
peer_id,
request_id,
request,
},
}
}
/// Get a `str` representation of the type of work this `WorkEvent` contains. /// Get a `str` representation of the type of work this `WorkEvent` contains.
pub fn work_type(&self) -> &'static str { pub fn work_type(&self) -> &'static str {
self.work.str_id() self.work.str_id()
@ -757,6 +774,11 @@ pub enum Work<T: BeaconChainTypes> {
request_id: PeerRequestId, request_id: PeerRequestId,
request: BlocksByRootRequest, request: BlocksByRootRequest,
}, },
BlobsByRangeRequest {
peer_id: PeerId,
request_id: PeerRequestId,
request: BlobsByRangeRequest,
}
} }
impl<T: BeaconChainTypes> Work<T> { impl<T: BeaconChainTypes> Work<T> {
@ -780,6 +802,7 @@ impl<T: BeaconChainTypes> Work<T> {
Work::Status { .. } => STATUS_PROCESSING, Work::Status { .. } => STATUS_PROCESSING,
Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST, Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST,
Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST, Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST,
Work::BlobsByRangeRequest {..} => BLOBS_BY_RANGE_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
} }
@ -924,6 +947,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN);
let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN); let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN);
let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN);
let mut blbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN);
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
// receive them back once they are ready (`ready_work_rx`). // receive them back once they are ready (`ready_work_rx`).
@ -1266,6 +1290,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::BlocksByRootsRequest { .. } => { Work::BlocksByRootsRequest { .. } => {
bbroots_queue.push(work, work_id, &self.log) bbroots_queue.push(work, work_id, &self.log)
} }
Work::BlobsByRangeRequest { .. } => {
blbrange_queue.push(work, work_id, &self.log)
}
Work::UnknownBlockAttestation { .. } => { Work::UnknownBlockAttestation { .. } => {
unknown_block_attestation_queue.push(work) unknown_block_attestation_queue.push(work)
} }
@ -1643,6 +1670,21 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
request, request,
) )
}), }),
Work::BlobsByRangeRequest {
peer_id,
request_id,
request
} => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| {
worker.handle_blobs_by_range_request(
sub_executor,
send_idle_on_drop,
peer_id,
request_id,
request,
)
}),
Work::UnknownBlockAttestation { Work::UnknownBlockAttestation {
message_id, message_id,
peer_id, peer_id,

View File

@ -6,6 +6,7 @@ use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, Whe
use itertools::process_results; use itertools::process_results;
use lighthouse_network::rpc::StatusMessage; use lighthouse_network::rpc::StatusMessage;
use lighthouse_network::rpc::*; use lighthouse_network::rpc::*;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
use slog::{debug, error}; use slog::{debug, error};
use slot_clock::SlotClock; use slot_clock::SlotClock;
@ -372,4 +373,16 @@ impl<T: BeaconChainTypes> Worker<T> {
"load_blocks_by_range_blocks", "load_blocks_by_range_blocks",
); );
} }
/// Handle a `BlobsByRange` request from the peer.
pub fn handle_blobs_by_range_request(
self,
executor: TaskExecutor,
send_on_drop: SendOnDrop,
peer_id: PeerId,
request_id: PeerRequestId,
request: BlobsByRangeRequest,
) {
// TODO impl
}
} }

View File

@ -168,11 +168,9 @@ impl<T: BeaconChainTypes> Processor<T> {
request_id: PeerRequestId, request_id: PeerRequestId,
request: BlobsByRangeRequest, request: BlobsByRangeRequest,
) { ) {
/* self.send_beacon_processor_work(BeaconWorkEvent::blobs_by_range_request(
self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_roots_request(
peer_id, request_id, request, peer_id, request_id, request,
)) ))
*/
} }
/// Handle a `BlocksByRange` request from the peer. /// Handle a `BlocksByRange` request from the peer.
pub fn on_blocks_by_range_request( pub fn on_blocks_by_range_request(