some more sync boilerplate
This commit is contained in:
parent
4008da6c60
commit
ebc0ccd02a
@ -602,7 +602,7 @@ where
|
|||||||
///
|
///
|
||||||
/// If type inference errors are being raised, see the comment on the definition of `Self`.
|
/// If type inference errors are being raised, see the comment on the definition of `Self`.
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
pub fn build(
|
pub fn build(
|
||||||
mut self,
|
mut self,
|
||||||
) -> Result<Client<Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>>, String>
|
) -> Result<Client<Witness<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>>, String>
|
||||||
{
|
{
|
||||||
|
@ -59,7 +59,8 @@ use std::task::Context;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::{cmp, collections::HashSet};
|
use std::{cmp, collections::HashSet};
|
||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
use lighthouse_network::rpc::methods::TxBlobsByRangeRequest;
|
||||||
use types::{
|
use types::{
|
||||||
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
|
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
|
||||||
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
|
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
|
||||||
@ -152,6 +153,8 @@ const MAX_STATUS_QUEUE_LEN: usize = 1_024;
|
|||||||
/// will be stored before we start dropping them.
|
/// will be stored before we start dropping them.
|
||||||
const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024;
|
const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024;
|
||||||
|
|
||||||
|
const MAX_TX_BLOBS_BY_RANGE_QUEUE_LEN: usize = 1_024;
|
||||||
|
|
||||||
/// The maximum number of queued `BlocksByRootRequest` objects received from the network RPC that
|
/// The maximum number of queued `BlocksByRootRequest` objects received from the network RPC that
|
||||||
/// will be stored before we start dropping them.
|
/// will be stored before we start dropping them.
|
||||||
const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024;
|
const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024;
|
||||||
@ -194,6 +197,7 @@ pub const RPC_BLOCK: &str = "rpc_block";
|
|||||||
pub const CHAIN_SEGMENT: &str = "chain_segment";
|
pub const CHAIN_SEGMENT: &str = "chain_segment";
|
||||||
pub const STATUS_PROCESSING: &str = "status_processing";
|
pub const STATUS_PROCESSING: &str = "status_processing";
|
||||||
pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request";
|
pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request";
|
||||||
|
pub const TX_BLOBS_BY_RANGE_REQUEST: &str = "tx_blobs_by_range_request";
|
||||||
pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
|
pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
|
||||||
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
|
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
|
||||||
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
|
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
|
||||||
@ -541,6 +545,21 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn tx_blob_by_range_request(
|
||||||
|
peer_id: PeerId,
|
||||||
|
request_id: PeerRequestId,
|
||||||
|
request: TxBlobsByRangeRequest,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
drop_during_sync: false,
|
||||||
|
work: Work::TxBlobsByRangeRequest {
|
||||||
|
peer_id,
|
||||||
|
request_id,
|
||||||
|
request,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Create a new work event to process `BlocksByRootRequest`s from the RPC network.
|
/// Create a new work event to process `BlocksByRootRequest`s from the RPC network.
|
||||||
pub fn blocks_by_roots_request(
|
pub fn blocks_by_roots_request(
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
@ -728,6 +747,11 @@ pub enum Work<T: BeaconChainTypes> {
|
|||||||
request_id: PeerRequestId,
|
request_id: PeerRequestId,
|
||||||
request: BlocksByRangeRequest,
|
request: BlocksByRangeRequest,
|
||||||
},
|
},
|
||||||
|
TxBlobsByRangeRequest {
|
||||||
|
peer_id: PeerId,
|
||||||
|
request_id: PeerRequestId,
|
||||||
|
request: TxBlobsByRangeRequest,
|
||||||
|
},
|
||||||
BlocksByRootsRequest {
|
BlocksByRootsRequest {
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
request_id: PeerRequestId,
|
request_id: PeerRequestId,
|
||||||
@ -754,6 +778,7 @@ impl<T: BeaconChainTypes> Work<T> {
|
|||||||
Work::ChainSegment { .. } => CHAIN_SEGMENT,
|
Work::ChainSegment { .. } => CHAIN_SEGMENT,
|
||||||
Work::Status { .. } => STATUS_PROCESSING,
|
Work::Status { .. } => STATUS_PROCESSING,
|
||||||
Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST,
|
Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST,
|
||||||
|
Work::TxBlobsByRangeRequest { .. } => TX_BLOBS_BY_RANGE_REQUEST,
|
||||||
Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST,
|
Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST,
|
||||||
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
|
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
|
||||||
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
|
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
|
||||||
@ -897,6 +922,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
|
|
||||||
let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN);
|
let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN);
|
||||||
let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN);
|
let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN);
|
||||||
|
let mut txbbrange_queue = FifoQueue::new(MAX_TX_BLOBS_BY_RANGE_QUEUE_LEN);
|
||||||
let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN);
|
let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN);
|
||||||
|
|
||||||
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
|
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
|
||||||
@ -1119,6 +1145,8 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
self.spawn_worker(item, toolbox);
|
self.spawn_worker(item, toolbox);
|
||||||
} else if let Some(item) = bbrange_queue.pop() {
|
} else if let Some(item) = bbrange_queue.pop() {
|
||||||
self.spawn_worker(item, toolbox);
|
self.spawn_worker(item, toolbox);
|
||||||
|
} else if let Some(item) = txbbrange_queue.pop() {
|
||||||
|
self.spawn_worker(item, toolbox);
|
||||||
} else if let Some(item) = bbroots_queue.pop() {
|
} else if let Some(item) = bbroots_queue.pop() {
|
||||||
self.spawn_worker(item, toolbox);
|
self.spawn_worker(item, toolbox);
|
||||||
// Check slashings after all other consensus messages so we prioritize
|
// Check slashings after all other consensus messages so we prioritize
|
||||||
@ -1234,6 +1262,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
Work::BlocksByRangeRequest { .. } => {
|
Work::BlocksByRangeRequest { .. } => {
|
||||||
bbrange_queue.push(work, work_id, &self.log)
|
bbrange_queue.push(work, work_id, &self.log)
|
||||||
}
|
}
|
||||||
|
Work::TxBlobsByRangeRequest { .. } => {
|
||||||
|
txbbrange_queue.push(work, work_id, &self.log)
|
||||||
|
}
|
||||||
Work::BlocksByRootsRequest { .. } => {
|
Work::BlocksByRootsRequest { .. } => {
|
||||||
bbroots_queue.push(work, work_id, &self.log)
|
bbroots_queue.push(work, work_id, &self.log)
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ use lighthouse_network::rpc::StatusMessage;
|
|||||||
use lighthouse_network::rpc::*;
|
use lighthouse_network::rpc::*;
|
||||||
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
|
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
|
||||||
use slog::{debug, error};
|
use slog::{debug, error};
|
||||||
|
use lighthouse_network::rpc::methods::TxBlobsByRangeRequest;
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
@ -122,6 +123,15 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn handle_tx_blobs_by_range_request(
|
||||||
|
&self,
|
||||||
|
peer_id: PeerId,
|
||||||
|
request_id: PeerRequestId,
|
||||||
|
mut req: TxBlobsByRangeRequest,
|
||||||
|
) {
|
||||||
|
//FIXME(sean)
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle a `BlocksByRoot` request from the peer.
|
/// Handle a `BlocksByRoot` request from the peer.
|
||||||
pub fn handle_blocks_by_root_request(
|
pub fn handle_blocks_by_root_request(
|
||||||
self,
|
self,
|
||||||
|
@ -209,7 +209,9 @@ impl<T: BeaconChainTypes> Processor<T> {
|
|||||||
request_id: PeerRequestId,
|
request_id: PeerRequestId,
|
||||||
req: TxBlobsByRangeRequest,
|
req: TxBlobsByRangeRequest,
|
||||||
) {
|
) {
|
||||||
//FIXME(sean)
|
self.send_beacon_processor_work(BeaconWorkEvent::tx_blob_by_range_request(
|
||||||
|
peer_id, request_id, req,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn on_tx_blobs_by_range_response(
|
pub fn on_tx_blobs_by_range_response(
|
||||||
@ -218,7 +220,24 @@ impl<T: BeaconChainTypes> Processor<T> {
|
|||||||
request_id: RequestId,
|
request_id: RequestId,
|
||||||
blob_wrapper: Option<Box<BlobWrapper<T::EthSpec>>>,
|
blob_wrapper: Option<Box<BlobWrapper<T::EthSpec>>>,
|
||||||
) {
|
) {
|
||||||
//FIXME(sean)
|
trace!(
|
||||||
|
self.log,
|
||||||
|
"Received TxBlobsByRange Response";
|
||||||
|
"peer" => %peer_id,
|
||||||
|
);
|
||||||
|
|
||||||
|
if let RequestId::Sync(id) = request_id {
|
||||||
|
self.send_to_sync(SyncMessage::TxBlobsByRangeResponse {
|
||||||
|
peer_id,
|
||||||
|
request_id: id,
|
||||||
|
blob_wrapper,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"All tx blobs by range responses should belong to sync"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a `BlocksByRoot` response from the peer.
|
/// Handle a `BlocksByRoot` response from the peer.
|
||||||
|
@ -53,7 +53,7 @@ use std::ops::Sub;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use types::{EthSpec, Hash256, SignedBeaconBlock, Slot};
|
use types::{BlobWrapper, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
|
||||||
|
|
||||||
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
|
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
|
||||||
/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a
|
/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a
|
||||||
@ -88,6 +88,18 @@ pub enum SyncMessage<T: EthSpec> {
|
|||||||
/// A block has been received from the RPC.
|
/// A block has been received from the RPC.
|
||||||
RpcBlock {
|
RpcBlock {
|
||||||
request_id: RequestId,
|
request_id: RequestId,
|
||||||
|
beacon_block: Option<Box<SignedBeaconBlock<T>>>,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// A [`TxBlobsByRangeResponse`] response has been received.
|
||||||
|
TxBlobsByRangeResponse {
|
||||||
|
peer_id: PeerId,
|
||||||
|
request_id: RequestId,
|
||||||
|
blob_wrapper: Option<Box<BlobWrapper<T>>>,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// A [`BlocksByRoot`] response has been received.
|
||||||
|
BlocksByRootResponse {
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
beacon_block: Option<Arc<SignedBeaconBlock<T>>>,
|
beacon_block: Option<Arc<SignedBeaconBlock<T>>>,
|
||||||
seen_timestamp: Duration,
|
seen_timestamp: Duration,
|
||||||
|
@ -55,7 +55,7 @@ use lru_cache::LRUTimeCache;
|
|||||||
use slog::{crit, debug, trace, warn};
|
use slog::{crit, debug, trace, warn};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
|
use types::{BlobWrapper, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
|
||||||
|
|
||||||
/// For how long we store failed finalized chains to prevent retries.
|
/// For how long we store failed finalized chains to prevent retries.
|
||||||
const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30;
|
const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30;
|
||||||
|
Loading…
Reference in New Issue
Block a user