From a7896a58cc18042a6ed567a8ec2288c682509fd0 Mon Sep 17 00:00:00 2001 From: Divma Date: Thu, 26 May 2022 02:05:17 +0000 Subject: [PATCH] move backfill sync jobs from highest priority to lowest (#3215) ## Issue Addressed #3212 ## Proposed Changes Move chain segments coming from back-fill syncing from highest priority to lowest ## Additional Info If this does not solve the issue, next steps would be lowering the batch size for back-fill sync, and as last resort throttling the processing of these chain segments --- .../network/src/beacon_processor/mod.rs | 20 ++++++++++++++++--- beacon_node/network/src/metrics.rs | 4 ++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 3e25bd144..76903705f 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -869,6 +869,7 @@ impl BeaconProcessor { // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); + let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN); @@ -1110,6 +1111,9 @@ impl BeaconProcessor { // Check exits last since our validators don't get rewards from them. } else if let Some(item) = gossip_voluntary_exit_queue.pop() { self.spawn_worker(item, toolbox); + // Handle backfill sync chain segments. + } else if let Some(item) = backfill_chain_segment.pop() { + self.spawn_worker(item, toolbox); // This statement should always be the final else statement. } else { // Let the journal know that a worker is freed and there's nothing else @@ -1195,9 +1199,15 @@ impl BeaconProcessor { sync_contribution_queue.push(work) } Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log), - Work::ChainSegment { .. } => { - chain_segment_queue.push(work, work_id, &self.log) - } + Work::ChainSegment { ref process_id, .. } => match process_id { + ChainSegmentProcessId::RangeBatchId { .. } + | ChainSegmentProcessId::ParentLookup { .. } => { + chain_segment_queue.push(work, work_id, &self.log) + } + ChainSegmentProcessId::BackSyncBatchId { .. } => { + backfill_chain_segment.push(work, work_id, &self.log) + } + }, Work::Status { .. } => status_queue.push(work, work_id, &self.log), Work::BlocksByRangeRequest { .. } => { bbrange_queue.push(work, work_id, &self.log) @@ -1247,6 +1257,10 @@ impl BeaconProcessor { &metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL, chain_segment_queue.len() as i64, ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_QUEUE_TOTAL, + backfill_chain_segment.len() as i64, + ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_EXIT_QUEUE_TOTAL, gossip_voluntary_exit_queue.len() as i64, diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 02c491cb0..cc0165131 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -161,6 +161,10 @@ lazy_static! { "beacon_processor_chain_segment_queue_total", "Count of chain segments from the rpc waiting to be verified." ); + pub static ref BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_backfill_chain_segment_queue_total", + "Count of backfill chain segments from the rpc waiting to be verified." + ); pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL: Result = try_create_int_counter( "beacon_processor_chain_segment_success_total", "Total number of chain segments successfully processed."