Sync and multi-client updates (#1044)

* Update finalized/head sync logic

* Correct sync logging

* Handle status during sync gracefully
This commit is contained in:
Age Manning 2020-04-23 19:01:29 +10:00 committed by GitHub
parent 6784a8b42a
commit 79cc9473c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 146 additions and 90 deletions

View File

@ -57,7 +57,7 @@ impl PeerSyncStatus {
let new_state = PeerSyncStatus::Synced { info };
match self {
PeerSyncStatus::Synced { .. } => {
PeerSyncStatus::Synced { .. } | PeerSyncStatus::Unknown => {
*self = new_state;
false // state was not updated
}
@ -70,11 +70,11 @@ impl PeerSyncStatus {
/// Updates the sync state given a peer that is further ahead in the chain than us.
/// Returns true if the state has changed.
pub fn update_ahead(&mut self, info: SyncInfo) -> bool {
pub fn update_advanced(&mut self, info: SyncInfo) -> bool {
let new_state = PeerSyncStatus::Advanced { info };
match self {
PeerSyncStatus::Advanced { .. } => {
PeerSyncStatus::Advanced { .. } | PeerSyncStatus::Unknown => {
*self = new_state;
false // state was not updated
}
@ -91,7 +91,7 @@ impl PeerSyncStatus {
let new_state = PeerSyncStatus::Behind { info };
match self {
PeerSyncStatus::Behind { .. } => {
PeerSyncStatus::Behind { .. } | PeerSyncStatus::Unknown => {
*self = new_state;
false // state was not updated
}

View File

@ -145,7 +145,7 @@ impl<T: BeaconChainTypes> Processor<T> {
/// Process a `Status` response from a peer.
pub fn on_status_response(&mut self, peer_id: PeerId, status: StatusMessage) {
trace!(
debug!(
self.log,
"Received Status Response";
"peer" => format!("{:?}", peer_id),

View File

@ -268,8 +268,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"peer" => format!("{:?}", peer_id),
"peer_head_slot" => remote.head_slot,
"local_head_slot" => local_peer_info.head_slot,
"remote_finalized_epoch" => local_peer_info.finalized_epoch,
"local_finalized_epoch" => remote.finalized_epoch,
"peer_finalized_epoch" => remote.finalized_epoch,
"local_finalized_epoch" => local_peer_info.finalized_epoch,
);
// Add the peer to our RangeSync
self.range_sync
@ -528,11 +528,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.update_sync_state();
}
// TODO: Group these functions into one.
/// Updates the syncing state of a peer to be synced.
fn synced_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let head_slot = sync_info.head_slot;
let finalized_epoch = sync_info.finalized_epoch;
if peer_info.sync_status.update_synced(sync_info.into()) {
debug!(self.log, "Peer transitioned to synced status"; "peer_id" => format!("{}", peer_id));
debug!(self.log, "Peer transitioned sync state"; "new_state" => "synced", "peer_id" => format!("{}", peer_id), "head_slot" => head_slot, "finalized_epoch" => finalized_epoch);
}
} else {
crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id));
@ -543,9 +546,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// Updates the syncing state of a peer to be behind.
fn advanced_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let advanced_slot = sync_info.head_slot;
if peer_info.sync_status.update_ahead(sync_info.into()) {
debug!(self.log, "Peer transitioned to from synced state to ahead"; "peer_id" => format!("{}", peer_id), "head_slot" => advanced_slot);
let head_slot = sync_info.head_slot;
let finalized_epoch = sync_info.finalized_epoch;
if peer_info.sync_status.update_advanced(sync_info.into()) {
debug!(self.log, "Peer transitioned sync state"; "new_state" => "advanced", "peer_id" => format!("{}", peer_id), "head_slot" => head_slot, "finalized_epoch" => finalized_epoch);
}
} else {
crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id));
@ -556,9 +560,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// Updates the syncing state of a peer to be behind.
fn behind_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let behind_slot = sync_info.head_slot;
let head_slot = sync_info.head_slot;
let finalized_epoch = sync_info.finalized_epoch;
if peer_info.sync_status.update_behind(sync_info.into()) {
debug!(self.log, "Peer transitioned to from synced state to behind"; "peer_id" => format!("{}", peer_id), "head_slot" => behind_slot);
debug!(self.log, "Peer transitioned sync state"; "new_state" => "behind", "peer_id" => format!("{}", peer_id), "head_slot" => head_slot, "finalized_epoch" => finalized_epoch);
}
} else {
crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id));

View File

@ -7,7 +7,6 @@ use std::ops::Sub;
use std::sync::Arc;
use types::{Epoch, Hash256, Slot};
///
/// Keeps track of syncing information for known connected peers.
#[derive(Clone, Copy, Debug)]
pub struct PeerSyncInfo {
@ -66,7 +65,7 @@ impl PeerSyncInfo {
PeerSyncType::FullySynced
}
// if not, check if the peer is ahead of our chain
else if self.is_ahead_peer(remote_peer_sync_info) {
else if self.is_advanced_peer(remote_peer_sync_info) {
PeerSyncType::Advanced
} else {
// the peer must be behind and not useful
@ -102,7 +101,7 @@ impl PeerSyncInfo {
/// 1) The peer could have a head slot that is greater
/// than SLOT_IMPORT_TOLERANCE of our current head.
/// 2) The peer has a greater finalized slot/epoch than our own.
fn is_ahead_peer(&self, remote: &PeerSyncInfo) -> bool {
fn is_advanced_peer(&self, remote: &PeerSyncInfo) -> bool {
if remote.head_slot.sub(self.head_slot).as_usize() > SLOT_IMPORT_TOLERANCE
|| self.finalized_epoch < remote.finalized_epoch
{

View File

@ -634,10 +634,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> ProcessingResult {
batch.retries += 1;
// TODO: Handle partially downloaded batches. Update this when building a new batch
// processor thread.
if batch.retries > MAX_BATCH_RETRIES {
if batch.retries > MAX_BATCH_RETRIES || self.peer_pool.is_empty() {
// chain is unrecoverable, remove it
ProcessingResult::RemoveChain
} else {

View File

@ -5,6 +5,7 @@ mod batch;
mod chain;
mod chain_collection;
mod range;
mod sync_type;
pub use batch::Batch;
pub use batch::BatchId;

View File

@ -41,6 +41,7 @@
use super::chain::{ChainId, ProcessingResult};
use super::chain_collection::{ChainCollection, RangeSyncState};
use super::sync_type::RangeSyncType;
use super::BatchId;
use crate::sync::block_processor::BatchProcessResult;
use crate::sync::manager::SyncMessage;
@ -112,7 +113,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
peer_id: PeerId,
remote: PeerSyncInfo,
remote_info: PeerSyncInfo,
) {
// evaluate which chain to sync from
@ -131,93 +132,106 @@ impl<T: BeaconChainTypes> RangeSync<T> {
};
// convenience variables
let remote_finalized_slot = remote
let remote_finalized_slot = remote_info
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
let local_finalized_slot = local_info
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
// remove peer from any chains
self.remove_peer(network, &peer_id);
// NOTE: A peer that has been re-status'd may now exist in multiple finalized chains.
// remove any out-of-date chains
self.chains.purge_outdated_chains(network);
if remote_finalized_slot > local_info.head_slot
&& !self
.beacon_chain
.fork_choice
.contains_block(&remote.finalized_root)
{
debug!(self.log, "Finalization sync peer joined"; "peer_id" => format!("{:?}", peer_id));
// Finalized chain search
// determine which kind of sync to perform and set up the chains
match RangeSyncType::new(&self.beacon_chain, &local_info, &remote_info) {
RangeSyncType::Finalized => {
// Finalized chain search
debug!(self.log, "Finalization sync peer joined"; "peer_id" => format!("{:?}", peer_id));
// Note: We keep current head chains. These can continue syncing whilst we complete
// this new finalized chain.
// remove the peer from the awaiting_head_peers list if it exists
self.awaiting_head_peers.remove(&peer_id);
// If a finalized chain already exists that matches, add this peer to the chain's peer
// pool.
if let Some(chain) = self
.chains
.get_finalized_mut(remote.finalized_root, remote_finalized_slot)
{
debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => format!("{:?}", peer_id), "target_root" => format!("{}", chain.target_head_root), "end_slot" => chain.target_head_slot, "start_slot"=> chain.start_slot);
// Note: We keep current head chains. These can continue syncing whilst we complete
// this new finalized chain.
// add the peer to the chain's peer pool
chain.add_peer(network, peer_id);
// If a finalized chain already exists that matches, add this peer to the chain's peer
// pool.
if let Some(chain) = self
.chains
.get_finalized_mut(remote_info.finalized_root, remote_finalized_slot)
{
debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => format!("{:?}", peer_id), "target_root" => format!("{}", chain.target_head_root), "end_slot" => chain.target_head_slot, "start_slot"=> chain.start_slot);
// check if the new peer's addition will favour a new syncing chain.
self.chains.update_finalized(network);
self.chains.update_sync_state();
} else {
// there is no finalized chain that matches this peer's last finalized target
// create a new finalized chain
debug!(self.log, "New finalized chain added to sync"; "peer_id" => format!("{:?}", peer_id), "start_slot" => local_finalized_slot.as_u64(), "end_slot" => remote_finalized_slot.as_u64(), "finalized_root" => format!("{}", remote.finalized_root));
// add the peer to the chain's peer pool
chain.add_peer(network, peer_id);
self.chains.new_finalized_chain(
local_finalized_slot,
remote.finalized_root,
remote_finalized_slot,
peer_id,
self.sync_send.clone(),
);
// check if the new peer's addition will favour a new syncing chain.
self.chains.update_finalized(network);
// update the global sync state if necessary
self.chains.update_sync_state();
} else {
// there is no finalized chain that matches this peer's last finalized target
// create a new finalized chain
debug!(self.log, "New finalized chain added to sync"; "peer_id" => format!("{:?}", peer_id), "start_slot" => local_finalized_slot.as_u64(), "end_slot" => remote_finalized_slot.as_u64(), "finalized_root" => format!("{}", remote_info.finalized_root));
self.chains.new_finalized_chain(
local_finalized_slot,
remote_info.finalized_root,
remote_finalized_slot,
peer_id,
self.sync_send.clone(),
);
self.chains.update_finalized(network);
// update the global sync state
self.chains.update_sync_state();
}
}
RangeSyncType::Head => {
// This peer requires a head chain sync
if self.chains.is_finalizing_sync() {
// If there are finalized chains to sync, finish these first, before syncing head
// chains. This allows us to re-sync all known peers
trace!(self.log, "Waiting for finalized sync to complete"; "peer_id" => format!("{:?}", peer_id));
// store the peer to re-status after all finalized chains complete
self.awaiting_head_peers.insert(peer_id);
return;
}
// if the peer existed in any other head chain, remove it.
self.remove_peer(network, &peer_id);
// The new peer has the same finalized (earlier filters should prevent a peer with an
// earlier finalized chain from reaching here).
debug!(self.log, "New peer added for recent head sync"; "peer_id" => format!("{:?}", peer_id));
// search if there is a matching head chain, then add the peer to the chain
if let Some(chain) = self
.chains
.get_head_mut(remote_info.head_root, remote_info.head_slot)
{
debug!(self.log, "Adding peer to the existing head chain peer pool"; "head_root" => format!("{}",remote_info.head_root), "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id));
// add the peer to the head's pool
chain.add_peer(network, peer_id);
} else {
// There are no other head chains that match this peer's status, create a new one, and
let start_slot = std::cmp::min(local_info.head_slot, remote_finalized_slot);
debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote_info.head_root), "start_slot" => start_slot, "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id));
self.chains.new_head_chain(
network,
start_slot,
remote_info.head_root,
remote_info.head_slot,
peer_id,
self.sync_send.clone(),
);
}
self.chains.update_finalized(network);
self.chains.update_sync_state();
}
} else {
if self.chains.is_finalizing_sync() {
// If there are finalized chains to sync, finish these first, before syncing head
// chains. This allows us to re-sync all known peers
trace!(self.log, "Waiting for finalized sync to complete"; "peer_id" => format!("{:?}", peer_id));
return;
}
// The new peer has the same finalized (earlier filters should prevent a peer with an
// earlier finalized chain from reaching here).
debug!(self.log, "New peer added for recent head sync"; "peer_id" => format!("{:?}", peer_id));
// search if there is a matching head chain, then add the peer to the chain
if let Some(chain) = self.chains.get_head_mut(remote.head_root, remote.head_slot) {
debug!(self.log, "Adding peer to the existing head chain peer pool"; "head_root" => format!("{}",remote.head_root), "head_slot" => remote.head_slot, "peer_id" => format!("{:?}", peer_id));
// add the peer to the head's pool
chain.add_peer(network, peer_id);
} else {
// There are no other head chains that match this peer's status, create a new one, and
let start_slot = std::cmp::min(local_info.head_slot, remote_finalized_slot);
debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote.head_root), "start_slot" => start_slot, "head_slot" => remote.head_slot, "peer_id" => format!("{:?}", peer_id));
self.chains.new_head_chain(
network,
start_slot,
remote.head_root,
remote.head_slot,
peer_id,
self.sync_send.clone(),
);
}
self.chains.update_finalized(network);
self.chains.update_sync_state();
}
}

View File

@ -0,0 +1,40 @@
//! Contains logic about identifying which Sync to perform given PeerSyncInfo of ourselves and
//! of a remote.
use crate::sync::PeerSyncInfo;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use std::sync::Arc;
/// The type of Range sync that should be done relative to our current state.
pub enum RangeSyncType {
/// A finalized chain sync should be started with this peer.
Finalized,
/// A head chain sync should be started with this peer.
Head,
}
impl RangeSyncType {
/// Determines the type of sync given our local `PeerSyncInfo` and the remote's
/// `PeerSyncInfo`.
pub fn new<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
local_info: &PeerSyncInfo,
remote_info: &PeerSyncInfo,
) -> RangeSyncType {
// Check for finalized chain sync
//
// The condition is:
// - The remotes finalized epoch is greater than our current finalized epoch and we have
// not seen the finalized hash before.
if remote_info.finalized_epoch > local_info.finalized_epoch
&& !chain
.fork_choice
.contains_block(&remote_info.finalized_root)
{
RangeSyncType::Finalized
} else {
RangeSyncType::Head
}
}
}