Sync Bug fixes (#1950)
## Issue Addressed Two issues related to empty batches - Chain target's was not being advanced when the batch was successful, empty and the chain didn't have an optimistic batch - Not switching finalized chains. We now switch finalized chains requiring a minimum work first
This commit is contained in:
parent
21617aa87f
commit
6f890c398e
@ -93,7 +93,7 @@ pub struct SyncingChain<T: BeaconChainTypes> {
|
|||||||
current_processing_batch: Option<BatchId>,
|
current_processing_batch: Option<BatchId>,
|
||||||
|
|
||||||
/// Batches validated by this chain.
|
/// Batches validated by this chain.
|
||||||
validated_batches: u8,
|
validated_batches: u64,
|
||||||
|
|
||||||
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
|
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
|
||||||
beacon_processor_send: Sender<BeaconWorkEvent<T::EthSpec>>,
|
beacon_processor_send: Sender<BeaconWorkEvent<T::EthSpec>>,
|
||||||
@ -167,7 +167,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
|
|
||||||
/// Progress in epochs made by the chain
|
/// Progress in epochs made by the chain
|
||||||
pub fn validated_epochs(&self) -> u64 {
|
pub fn validated_epochs(&self) -> u64 {
|
||||||
self.validated_batches as u64 * EPOCHS_PER_BATCH
|
self.validated_batches * EPOCHS_PER_BATCH
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes a peer from the chain.
|
/// Removes a peer from the chain.
|
||||||
@ -249,10 +249,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
|
|
||||||
match batch.download_completed() {
|
match batch.download_completed() {
|
||||||
Ok(received) => {
|
Ok(received) => {
|
||||||
let awaiting_batches = batch_id.saturating_sub(
|
let awaiting_batches = batch_id
|
||||||
self.optimistic_start
|
.saturating_sub(self.optimistic_start.unwrap_or(self.processing_target))
|
||||||
.unwrap_or_else(|| self.processing_target),
|
/ EPOCHS_PER_BATCH;
|
||||||
) / EPOCHS_PER_BATCH;
|
|
||||||
debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches);
|
debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches);
|
||||||
|
|
||||||
// pre-emptively request more blocks from peers whilst we process current blocks,
|
// pre-emptively request more blocks from peers whilst we process current blocks,
|
||||||
@ -408,6 +407,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
if self.to_be_downloaded <= self.processing_target {
|
if self.to_be_downloaded <= self.processing_target {
|
||||||
self.to_be_downloaded = self.processing_target + EPOCHS_PER_BATCH;
|
self.to_be_downloaded = self.processing_target + EPOCHS_PER_BATCH;
|
||||||
}
|
}
|
||||||
|
self.request_batches(network)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -462,19 +462,18 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
self.advance_chain(network, batch_id);
|
self.advance_chain(network, batch_id);
|
||||||
// we register so that on chain switching we don't try it again
|
// we register so that on chain switching we don't try it again
|
||||||
self.attempted_optimistic_starts.insert(batch_id);
|
self.attempted_optimistic_starts.insert(batch_id);
|
||||||
self.processing_target += EPOCHS_PER_BATCH;
|
} else if self.optimistic_start == Some(batch_id) {
|
||||||
} else if let Some(epoch) = self.optimistic_start {
|
|
||||||
// check if this batch corresponds to an optimistic batch. In this case, we
|
// check if this batch corresponds to an optimistic batch. In this case, we
|
||||||
// reject it as an optimistic candidate since the batch was empty
|
// reject it as an optimistic candidate since the batch was empty
|
||||||
if epoch == batch_id {
|
self.reject_optimistic_batch(
|
||||||
self.reject_optimistic_batch(
|
network,
|
||||||
network,
|
false, /* do not re-request */
|
||||||
false, /* do not re-request */
|
"batch was empty",
|
||||||
"batch was empty",
|
)?;
|
||||||
)?;
|
}
|
||||||
} else {
|
|
||||||
self.processing_target += EPOCHS_PER_BATCH;
|
if batch_id == self.processing_target {
|
||||||
}
|
self.processing_target += EPOCHS_PER_BATCH;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if the chain has completed syncing
|
// check if the chain has completed syncing
|
||||||
@ -1038,7 +1037,7 @@ impl<T: BeaconChainTypes> slog::KV for SyncingChain<T> {
|
|||||||
)?;
|
)?;
|
||||||
serializer.emit_usize("batches", self.batches.len())?;
|
serializer.emit_usize("batches", self.batches.len())?;
|
||||||
serializer.emit_usize("peers", self.peers.len())?;
|
serializer.emit_usize("peers", self.peers.len())?;
|
||||||
serializer.emit_u8("validated_batches", self.validated_batches)?;
|
serializer.emit_u64("validated_batches", self.validated_batches)?;
|
||||||
serializer.emit_arguments("state", &format_args!("{:?}", self.state))?;
|
serializer.emit_arguments("state", &format_args!("{:?}", self.state))?;
|
||||||
slog::Result::Ok(())
|
slog::Result::Ok(())
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,9 @@ use types::{Epoch, Hash256, Slot};
|
|||||||
/// The number of head syncing chains to sync at a time.
|
/// The number of head syncing chains to sync at a time.
|
||||||
const PARALLEL_HEAD_CHAINS: usize = 2;
|
const PARALLEL_HEAD_CHAINS: usize = 2;
|
||||||
|
|
||||||
|
/// Minimum work we require a finalized chain to do before picking a chain with more peers.
|
||||||
|
const MIN_FINALIZED_CHAIN_VALIDATED_EPOCHS: u64 = 10;
|
||||||
|
|
||||||
/// The state of the long range/batch sync.
|
/// The state of the long range/batch sync.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub enum RangeSyncState {
|
pub enum RangeSyncState {
|
||||||
@ -252,7 +255,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
|||||||
local_head_epoch: Epoch,
|
local_head_epoch: Epoch,
|
||||||
) {
|
) {
|
||||||
// Find the chain with most peers and check if it is already syncing
|
// Find the chain with most peers and check if it is already syncing
|
||||||
if let Some((mut new_id, peers)) = self
|
if let Some((mut new_id, max_peers)) = self
|
||||||
.finalized_chains
|
.finalized_chains
|
||||||
.iter()
|
.iter()
|
||||||
.max_by_key(|(_, chain)| chain.available_peers())
|
.max_by_key(|(_, chain)| chain.available_peers())
|
||||||
@ -266,7 +269,10 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
|||||||
} else {
|
} else {
|
||||||
// chains are different, check that they don't have the same number of peers
|
// chains are different, check that they don't have the same number of peers
|
||||||
if let Some(syncing_chain) = self.finalized_chains.get_mut(&syncing_id) {
|
if let Some(syncing_chain) = self.finalized_chains.get_mut(&syncing_id) {
|
||||||
if syncing_chain.available_peers() > peers {
|
if max_peers > syncing_chain.available_peers()
|
||||||
|
&& syncing_chain.validated_epochs()
|
||||||
|
> MIN_FINALIZED_CHAIN_VALIDATED_EPOCHS
|
||||||
|
{
|
||||||
syncing_chain.stop_syncing();
|
syncing_chain.stop_syncing();
|
||||||
old_id = Some(Some(syncing_id));
|
old_id = Some(Some(syncing_id));
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
Reference in New Issue
Block a user