remove double backfill sync state (#2733)
## Issue Addressed In the backfill sync the state was maintained twice, once locally and also in the globals. This makes it so that it's maintained only once. The only behavioral change is that when backfill sync in paused, the global backfill state is updated. I asked @AgeManning about this and he deemed it a bug, so this solves it.
This commit is contained in:
parent
aad397f00a
commit
99f7a7db58
@ -93,9 +93,6 @@ pub enum BackFillError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct BackFillSync<T: BeaconChainTypes> {
|
pub struct BackFillSync<T: BeaconChainTypes> {
|
||||||
/// The current state of the backfill sync.
|
|
||||||
state: BackFillState,
|
|
||||||
|
|
||||||
/// Keeps track of the current progress of the backfill.
|
/// Keeps track of the current progress of the backfill.
|
||||||
/// This only gets refreshed from the beacon chain if we enter a failed state.
|
/// This only gets refreshed from the beacon chain if we enter a failed state.
|
||||||
current_start: BatchId,
|
current_start: BatchId,
|
||||||
@ -176,7 +173,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let bfs = BackFillSync {
|
let bfs = BackFillSync {
|
||||||
state,
|
|
||||||
batches: BTreeMap::new(),
|
batches: BTreeMap::new(),
|
||||||
active_requests: HashMap::new(),
|
active_requests: HashMap::new(),
|
||||||
processing_target: current_start,
|
processing_target: current_start,
|
||||||
@ -194,15 +190,15 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Update the global network state with the current backfill state.
|
// Update the global network state with the current backfill state.
|
||||||
bfs.update_global_state();
|
bfs.set_state(state);
|
||||||
bfs
|
bfs
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Pauses the backfill sync if it's currently syncing.
|
/// Pauses the backfill sync if it's currently syncing.
|
||||||
pub fn pause(&mut self) {
|
pub fn pause(&mut self) {
|
||||||
if let BackFillState::Syncing = self.state {
|
if let BackFillState::Syncing = self.state() {
|
||||||
debug!(self.log, "Backfill sync paused"; "processed_epochs" => self.validated_batches, "to_be_processed" => self.current_start);
|
debug!(self.log, "Backfill sync paused"; "processed_epochs" => self.validated_batches, "to_be_processed" => self.current_start);
|
||||||
self.state = BackFillState::Paused;
|
self.set_state(BackFillState::Paused);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,7 +210,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
network: &mut SyncNetworkContext<T::EthSpec>,
|
network: &mut SyncNetworkContext<T::EthSpec>,
|
||||||
) -> Result<SyncStart, BackFillError> {
|
) -> Result<SyncStart, BackFillError> {
|
||||||
match self.state {
|
match self.state() {
|
||||||
BackFillState::Syncing => {} // already syncing ignore.
|
BackFillState::Syncing => {} // already syncing ignore.
|
||||||
BackFillState::Paused => {
|
BackFillState::Paused => {
|
||||||
if self
|
if self
|
||||||
@ -227,7 +223,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
{
|
{
|
||||||
// If there are peers to resume with, begin the resume.
|
// If there are peers to resume with, begin the resume.
|
||||||
debug!(self.log, "Resuming backfill sync"; "start_epoch" => self.current_start, "awaiting_batches" => self.batches.len(), "processing_target" => self.processing_target);
|
debug!(self.log, "Resuming backfill sync"; "start_epoch" => self.current_start, "awaiting_batches" => self.batches.len(), "processing_target" => self.processing_target);
|
||||||
self.state = BackFillState::Syncing;
|
self.set_state(BackFillState::Syncing);
|
||||||
// Resume any previously failed batches.
|
// Resume any previously failed batches.
|
||||||
self.resume_batches(network)?;
|
self.resume_batches(network)?;
|
||||||
// begin requesting blocks from the peer pool, until all peers are exhausted.
|
// begin requesting blocks from the peer pool, until all peers are exhausted.
|
||||||
@ -248,14 +244,13 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
return Ok(SyncStart::NotSyncing);
|
return Ok(SyncStart::NotSyncing);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.state = BackFillState::Syncing;
|
self.set_state(BackFillState::Syncing);
|
||||||
|
|
||||||
// Obtain a new start slot, from the beacon chain and handle possible errors.
|
// Obtain a new start slot, from the beacon chain and handle possible errors.
|
||||||
match self.reset_start_epoch() {
|
match self.reset_start_epoch() {
|
||||||
Err(ResetEpochError::SyncCompleted) => {
|
Err(ResetEpochError::SyncCompleted) => {
|
||||||
error!(self.log, "Backfill sync completed whilst in failed status");
|
error!(self.log, "Backfill sync completed whilst in failed status");
|
||||||
self.state = BackFillState::Completed;
|
self.set_state(BackFillState::Completed);
|
||||||
self.update_global_state();
|
|
||||||
return Err(BackFillError::InvalidSyncState(String::from(
|
return Err(BackFillError::InvalidSyncState(String::from(
|
||||||
"chain completed",
|
"chain completed",
|
||||||
)));
|
)));
|
||||||
@ -265,8 +260,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
self.log,
|
self.log,
|
||||||
"Backfill sync not required whilst in failed status"
|
"Backfill sync not required whilst in failed status"
|
||||||
);
|
);
|
||||||
self.state = BackFillState::NotRequired;
|
self.set_state(BackFillState::NotRequired);
|
||||||
self.update_global_state();
|
|
||||||
return Err(BackFillError::InvalidSyncState(String::from(
|
return Err(BackFillError::InvalidSyncState(String::from(
|
||||||
"backfill not required",
|
"backfill not required",
|
||||||
)));
|
)));
|
||||||
@ -284,8 +278,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.update_global_state();
|
|
||||||
|
|
||||||
Ok(SyncStart::Syncing {
|
Ok(SyncStart::Syncing {
|
||||||
completed: (self.validated_batches
|
completed: (self.validated_batches
|
||||||
* BACKFILL_EPOCHS_PER_BATCH
|
* BACKFILL_EPOCHS_PER_BATCH
|
||||||
@ -301,7 +293,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
/// If we are in a failed state, update a local variable to indicate we are able to restart
|
/// If we are in a failed state, update a local variable to indicate we are able to restart
|
||||||
/// the failed sync on the next attempt.
|
/// the failed sync on the next attempt.
|
||||||
pub fn fully_synced_peer_joined(&mut self) {
|
pub fn fully_synced_peer_joined(&mut self) {
|
||||||
if matches!(self.state, BackFillState::Failed) {
|
if matches!(self.state(), BackFillState::Failed) {
|
||||||
self.restart_failed_sync = true;
|
self.restart_failed_sync = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -315,7 +307,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
network: &mut SyncNetworkContext<T::EthSpec>,
|
network: &mut SyncNetworkContext<T::EthSpec>,
|
||||||
) -> Result<(), BackFillError> {
|
) -> Result<(), BackFillError> {
|
||||||
if matches!(
|
if matches!(
|
||||||
self.state,
|
self.state(),
|
||||||
BackFillState::Failed | BackFillState::NotRequired
|
BackFillState::Failed | BackFillState::NotRequired
|
||||||
) {
|
) {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@ -399,7 +391,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
// check if we have this batch
|
// check if we have this batch
|
||||||
let batch = match self.batches.get_mut(&batch_id) {
|
let batch = match self.batches.get_mut(&batch_id) {
|
||||||
None => {
|
None => {
|
||||||
if !matches!(self.state, BackFillState::Failed) {
|
if !matches!(self.state(), BackFillState::Failed) {
|
||||||
// A batch might get removed when the chain advances, so this is non fatal.
|
// A batch might get removed when the chain advances, so this is non fatal.
|
||||||
debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id);
|
debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id);
|
||||||
}
|
}
|
||||||
@ -476,7 +468,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Set the state
|
// Set the state
|
||||||
self.state = BackFillState::Failed;
|
self.set_state(BackFillState::Failed);
|
||||||
// Remove all batches and active requests and participating peers.
|
// Remove all batches and active requests and participating peers.
|
||||||
self.batches.clear();
|
self.batches.clear();
|
||||||
self.active_requests.clear();
|
self.active_requests.clear();
|
||||||
@ -489,9 +481,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
self.last_batch_downloaded = false;
|
self.last_batch_downloaded = false;
|
||||||
self.current_processing_batch = None;
|
self.current_processing_batch = None;
|
||||||
|
|
||||||
// Keep the global network state up to date.
|
|
||||||
self.update_global_state();
|
|
||||||
|
|
||||||
// NOTE: Lets keep validated_batches for posterity
|
// NOTE: Lets keep validated_batches for posterity
|
||||||
|
|
||||||
// Emit the log here
|
// Emit the log here
|
||||||
@ -510,7 +499,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
batch_id: BatchId,
|
batch_id: BatchId,
|
||||||
) -> Result<ProcessResult, BackFillError> {
|
) -> Result<ProcessResult, BackFillError> {
|
||||||
// Only process batches if this chain is Syncing, and only one at a time
|
// Only process batches if this chain is Syncing, and only one at a time
|
||||||
if self.state != BackFillState::Syncing || self.current_processing_batch.is_some() {
|
if self.state() != BackFillState::Syncing || self.current_processing_batch.is_some() {
|
||||||
return Ok(ProcessResult::Successful);
|
return Ok(ProcessResult::Successful);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -568,9 +557,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
batch_id: BatchId,
|
batch_id: BatchId,
|
||||||
result: &BatchProcessResult,
|
result: &BatchProcessResult,
|
||||||
) -> Result<ProcessResult, BackFillError> {
|
) -> Result<ProcessResult, BackFillError> {
|
||||||
// On each batch process, we update the global state.
|
|
||||||
self.update_global_state();
|
|
||||||
|
|
||||||
// The first two cases are possible in regular sync, should not occur in backfill, but we
|
// The first two cases are possible in regular sync, should not occur in backfill, but we
|
||||||
// keep this logic for handling potential processing race conditions.
|
// keep this logic for handling potential processing race conditions.
|
||||||
// result
|
// result
|
||||||
@ -625,8 +611,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
if self.check_completed() {
|
if self.check_completed() {
|
||||||
// chain is completed
|
// chain is completed
|
||||||
info!(self.log, "Backfill sync completed"; "blocks_processed" => self.validated_batches * T::EthSpec::slots_per_epoch());
|
info!(self.log, "Backfill sync completed"; "blocks_processed" => self.validated_batches * T::EthSpec::slots_per_epoch());
|
||||||
self.state = BackFillState::Completed;
|
self.set_state(BackFillState::Completed);
|
||||||
self.update_global_state();
|
|
||||||
Ok(ProcessResult::SyncCompleted)
|
Ok(ProcessResult::SyncCompleted)
|
||||||
} else {
|
} else {
|
||||||
// chain is not completed
|
// chain is not completed
|
||||||
@ -708,7 +693,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
network: &mut SyncNetworkContext<T::EthSpec>,
|
network: &mut SyncNetworkContext<T::EthSpec>,
|
||||||
) -> Result<ProcessResult, BackFillError> {
|
) -> Result<ProcessResult, BackFillError> {
|
||||||
// Only process batches if backfill is syncing and only process one batch at a time
|
// Only process batches if backfill is syncing and only process one batch at a time
|
||||||
if self.state != BackFillState::Syncing || self.current_processing_batch.is_some() {
|
if self.state() != BackFillState::Syncing || self.current_processing_batch.is_some() {
|
||||||
return Ok(ProcessResult::Successful);
|
return Ok(ProcessResult::Successful);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -943,7 +928,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
} else {
|
} else {
|
||||||
// If we are here the chain has no more synced peers
|
// If we are here the chain has no more synced peers
|
||||||
info!(self.log, "Backfill sync paused"; "reason" => "insufficient_synced_peers");
|
info!(self.log, "Backfill sync paused"; "reason" => "insufficient_synced_peers");
|
||||||
self.state = BackFillState::Paused;
|
self.set_state(BackFillState::Paused);
|
||||||
Err(BackFillError::Paused)
|
Err(BackFillError::Paused)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1031,7 +1016,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
network: &mut SyncNetworkContext<T::EthSpec>,
|
network: &mut SyncNetworkContext<T::EthSpec>,
|
||||||
) -> Result<(), BackFillError> {
|
) -> Result<(), BackFillError> {
|
||||||
if !matches!(self.state, BackFillState::Syncing) {
|
if !matches!(self.state(), BackFillState::Syncing) {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1158,8 +1143,12 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Updates the global network state indicating the current state of a backfill sync.
|
/// Updates the global network state indicating the current state of a backfill sync.
|
||||||
fn update_global_state(&self) {
|
fn set_state(&self, state: BackFillState) {
|
||||||
*self.network_globals.backfill_state.write() = self.state.clone();
|
*self.network_globals.backfill_state.write() = state;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn state(&self) -> BackFillState {
|
||||||
|
self.network_globals.backfill_state.read().clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user