move backfill sync jobs from highest priority to lowest (#3215)
## Issue Addressed #3212 ## Proposed Changes Move chain segments coming from back-fill syncing from highest priority to lowest ## Additional Info If this does not solve the issue, next steps would be lowering the batch size for back-fill sync, and as last resort throttling the processing of these chain segments
This commit is contained in:
parent
fd55373b88
commit
a7896a58cc
@ -869,6 +869,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
// Using a FIFO queue since blocks need to be imported sequentially.
|
||||
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
|
||||
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
|
||||
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
|
||||
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
|
||||
let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN);
|
||||
|
||||
@ -1110,6 +1111,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
// Check exits last since our validators don't get rewards from them.
|
||||
} else if let Some(item) = gossip_voluntary_exit_queue.pop() {
|
||||
self.spawn_worker(item, toolbox);
|
||||
// Handle backfill sync chain segments.
|
||||
} else if let Some(item) = backfill_chain_segment.pop() {
|
||||
self.spawn_worker(item, toolbox);
|
||||
// This statement should always be the final else statement.
|
||||
} else {
|
||||
// Let the journal know that a worker is freed and there's nothing else
|
||||
@ -1195,9 +1199,15 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
sync_contribution_queue.push(work)
|
||||
}
|
||||
Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log),
|
||||
Work::ChainSegment { .. } => {
|
||||
chain_segment_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
Work::ChainSegment { ref process_id, .. } => match process_id {
|
||||
ChainSegmentProcessId::RangeBatchId { .. }
|
||||
| ChainSegmentProcessId::ParentLookup { .. } => {
|
||||
chain_segment_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
ChainSegmentProcessId::BackSyncBatchId { .. } => {
|
||||
backfill_chain_segment.push(work, work_id, &self.log)
|
||||
}
|
||||
},
|
||||
Work::Status { .. } => status_queue.push(work, work_id, &self.log),
|
||||
Work::BlocksByRangeRequest { .. } => {
|
||||
bbrange_queue.push(work, work_id, &self.log)
|
||||
@ -1247,6 +1257,10 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL,
|
||||
chain_segment_queue.len() as i64,
|
||||
);
|
||||
metrics::set_gauge(
|
||||
&metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_QUEUE_TOTAL,
|
||||
backfill_chain_segment.len() as i64,
|
||||
);
|
||||
metrics::set_gauge(
|
||||
&metrics::BEACON_PROCESSOR_EXIT_QUEUE_TOTAL,
|
||||
gossip_voluntary_exit_queue.len() as i64,
|
||||
|
@ -161,6 +161,10 @@ lazy_static! {
|
||||
"beacon_processor_chain_segment_queue_total",
|
||||
"Count of chain segments from the rpc waiting to be verified."
|
||||
);
|
||||
pub static ref BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
|
||||
"beacon_processor_backfill_chain_segment_queue_total",
|
||||
"Count of backfill chain segments from the rpc waiting to be verified."
|
||||
);
|
||||
pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL: Result<IntCounter> = try_create_int_counter(
|
||||
"beacon_processor_chain_segment_success_total",
|
||||
"Total number of chain segments successfully processed."
|
||||
|
Loading…
Reference in New Issue
Block a user