diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index c1b374f67..913b95fb8 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -93,7 +93,7 @@ pub struct SyncingChain { current_processing_batch: Option, /// Batches validated by this chain. - validated_batches: u8, + validated_batches: u64, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. beacon_processor_send: Sender>, @@ -167,7 +167,7 @@ impl SyncingChain { /// Progress in epochs made by the chain pub fn validated_epochs(&self) -> u64 { - self.validated_batches as u64 * EPOCHS_PER_BATCH + self.validated_batches * EPOCHS_PER_BATCH } /// Removes a peer from the chain. @@ -249,10 +249,9 @@ impl SyncingChain { match batch.download_completed() { Ok(received) => { - let awaiting_batches = batch_id.saturating_sub( - self.optimistic_start - .unwrap_or_else(|| self.processing_target), - ) / EPOCHS_PER_BATCH; + let awaiting_batches = batch_id + .saturating_sub(self.optimistic_start.unwrap_or(self.processing_target)) + / EPOCHS_PER_BATCH; debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches); // pre-emptively request more blocks from peers whilst we process current blocks, @@ -408,6 +407,7 @@ impl SyncingChain { if self.to_be_downloaded <= self.processing_target { self.to_be_downloaded = self.processing_target + EPOCHS_PER_BATCH; } + self.request_batches(network)?; } } } else { @@ -462,19 +462,18 @@ impl SyncingChain { self.advance_chain(network, batch_id); // we register so that on chain switching we don't try it again self.attempted_optimistic_starts.insert(batch_id); - self.processing_target += EPOCHS_PER_BATCH; - } else if let Some(epoch) = self.optimistic_start { + } else if self.optimistic_start == Some(batch_id) { // check if this batch corresponds to an optimistic batch. In this case, we // reject it as an optimistic candidate since the batch was empty - if epoch == batch_id { - self.reject_optimistic_batch( - network, - false, /* do not re-request */ - "batch was empty", - )?; - } else { - self.processing_target += EPOCHS_PER_BATCH; - } + self.reject_optimistic_batch( + network, + false, /* do not re-request */ + "batch was empty", + )?; + } + + if batch_id == self.processing_target { + self.processing_target += EPOCHS_PER_BATCH; } // check if the chain has completed syncing @@ -1038,7 +1037,7 @@ impl slog::KV for SyncingChain { )?; serializer.emit_usize("batches", self.batches.len())?; serializer.emit_usize("peers", self.peers.len())?; - serializer.emit_u8("validated_batches", self.validated_batches)?; + serializer.emit_u64("validated_batches", self.validated_batches)?; serializer.emit_arguments("state", &format_args!("{:?}", self.state))?; slog::Result::Ok(()) } diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index fade0a4e7..180ceb949 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -23,6 +23,9 @@ use types::{Epoch, Hash256, Slot}; /// The number of head syncing chains to sync at a time. const PARALLEL_HEAD_CHAINS: usize = 2; +/// Minimum work we require a finalized chain to do before picking a chain with more peers. +const MIN_FINALIZED_CHAIN_VALIDATED_EPOCHS: u64 = 10; + /// The state of the long range/batch sync. #[derive(Clone)] pub enum RangeSyncState { @@ -252,7 +255,7 @@ impl ChainCollection { local_head_epoch: Epoch, ) { // Find the chain with most peers and check if it is already syncing - if let Some((mut new_id, peers)) = self + if let Some((mut new_id, max_peers)) = self .finalized_chains .iter() .max_by_key(|(_, chain)| chain.available_peers()) @@ -266,7 +269,10 @@ impl ChainCollection { } else { // chains are different, check that they don't have the same number of peers if let Some(syncing_chain) = self.finalized_chains.get_mut(&syncing_id) { - if syncing_chain.available_peers() > peers { + if max_peers > syncing_chain.available_peers() + && syncing_chain.validated_epochs() + > MIN_FINALIZED_CHAIN_VALIDATED_EPOCHS + { syncing_chain.stop_syncing(); old_id = Some(Some(syncing_id)); } else {