More sync updates (#1791)

## Issue Addressed
#1614 and a couple of sync-stalling problems, the most important is a cyclic dependency between the sync manager and the peer manager
This commit is contained in:
divma 2020-10-20 22:34:18 +00:00
parent 703c33bdc7
commit 2acf75785c
10 changed files with 397 additions and 469 deletions

View File

@ -35,8 +35,8 @@ use score::{PeerAction, ScoreState};
use std::collections::HashMap;
/// The time in seconds between re-status's peers.
const STATUS_INTERVAL: u64 = 300;
/// The time in seconds between PING events. We do not send a ping if the other peer as PING'd us within
/// this time frame (Seconds)
/// The time in seconds between PING events. We do not send a ping if the other peer has PING'd us
/// within this time frame (Seconds)
const PING_INTERVAL: u64 = 30;
/// The heartbeat performs regular updates such as updating reputations and performing discovery
@ -831,20 +831,16 @@ impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {
}
}
// We don't want to update peers during syncing, since this may result in a new chain being
// synced which leads to inefficient re-downloads of blocks.
if !self.network_globals.is_syncing() {
loop {
match self.status_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => {
self.status_peers.insert(peer_id.clone());
self.events.push(PeerManagerEvent::Status(peer_id))
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string())
}
Poll::Ready(None) | Poll::Pending => break,
loop {
match self.status_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => {
self.status_peers.insert(peer_id.clone());
self.events.push(PeerManagerEvent::Status(peer_id))
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string())
}
Poll::Ready(None) | Poll::Pending => break,
}
}

View File

@ -110,25 +110,8 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
/// Updates the syncing state of the node.
///
/// If there is a new state, the old state and the new states are returned.
pub fn update_sync_state(&self) -> Option<(SyncState, SyncState)> {
let mut result = None;
// if we are in a range sync, nothing changes. Range sync will update this.
if !self.is_syncing() {
let new_state = self
.peers
.read()
.synced_peers()
.next()
.map(|_| SyncState::Synced)
.unwrap_or_else(|| SyncState::Stalled);
let mut peer_state = self.sync_state.write();
if new_state != *peer_state {
result = Some((peer_state.clone(), new_state.clone()));
}
*peer_state = new_state;
}
result
/// The old state is returned
pub fn set_sync_state(&self, new_state: SyncState) -> SyncState {
std::mem::replace(&mut *self.sync_state.write(), new_state)
}
}

View File

@ -1,19 +1,15 @@
use serde::{Deserialize, Serialize};
use types::{Hash256, Slot};
use types::Slot;
/// The current state of the node.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum SyncState {
/// The node is performing a long-range (batch) sync over a finalized chain.
/// In this state, parent lookups are disabled.
SyncingFinalized {
start_slot: Slot,
head_slot: Slot,
head_root: Hash256,
},
SyncingFinalized { start_slot: Slot, target_slot: Slot },
/// The node is performing a long-range (batch) sync over one or many head chains.
/// In this state parent lookups are disabled.
SyncingHead { start_slot: Slot, head_slot: Slot },
SyncingHead { start_slot: Slot, target_slot: Slot },
/// The node is up to date with all known peers and is connected to at least one
/// fully synced peer. In this state, parent lookups are enabled.
Synced,

View File

@ -290,7 +290,7 @@ pub fn serve<T: BeaconChainTypes>(
.and_then(
|network_globals: Arc<NetworkGlobals<T::EthSpec>>, chain: Arc<BeaconChain<T>>| async move {
match *network_globals.sync_state.read() {
SyncState::SyncingFinalized { head_slot, .. } => {
SyncState::SyncingFinalized { target_slot, .. } => {
let current_slot = chain
.slot_clock
.now_or_genesis()
@ -302,12 +302,12 @@ pub fn serve<T: BeaconChainTypes>(
let tolerance = SYNC_TOLERANCE_EPOCHS * T::EthSpec::slots_per_epoch();
if head_slot + tolerance >= current_slot {
if target_slot + tolerance >= current_slot {
Ok(())
} else {
Err(warp_utils::reject::not_synced(format!(
"head slot is {}, current slot is {}",
head_slot, current_slot
target_slot, current_slot
)))
}
}

View File

@ -35,13 +35,13 @@
use super::network_context::SyncNetworkContext;
use super::peer_sync_info::{PeerSyncInfo, PeerSyncType};
use super::range_sync::{ChainId, RangeSync, EPOCHS_PER_BATCH};
use super::range_sync::{ChainId, RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use super::RequestId;
use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent};
use crate::service::NetworkMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason};
use eth2_libp2p::types::NetworkGlobals;
use eth2_libp2p::types::{NetworkGlobals, SyncState};
use eth2_libp2p::{PeerAction, PeerId};
use fnv::FnvHashMap;
use lru_cache::LRUCache;
@ -176,11 +176,11 @@ pub struct SyncManager<T: BeaconChainTypes> {
/// The flag allows us to determine if the peer returned data or sent us nothing.
single_block_lookups: FnvHashMap<RequestId, SingleBlockRequest>,
/// The logger for the import manager.
log: Logger,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
/// The logger for the import manager.
log: Logger,
}
/// Object representing a single block lookup request.
@ -222,7 +222,6 @@ pub fn spawn<T: BeaconChainTypes>(
let mut sync_manager = SyncManager {
range_sync: RangeSync::new(
beacon_chain.clone(),
network_globals.clone(),
beacon_processor_send.clone(),
log.clone(),
),
@ -233,8 +232,8 @@ pub fn spawn<T: BeaconChainTypes>(
parent_queue: SmallVec::new(),
failed_chains: LRUCache::new(500),
single_block_lookups: FnvHashMap::default(),
log: log.clone(),
beacon_processor_send,
log: log.clone(),
};
// spawn the sync manager thread
@ -276,8 +275,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"local_head_slot" => local_peer_info.head_slot,
);
self.synced_peer(&peer_id, remote);
// notify the range sync that a peer has been added
self.range_sync.fully_synced_peer_found(&mut self.network);
}
PeerSyncType::Advanced => {
trace!(self.log, "Useful peer for sync found";
@ -302,8 +299,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// the second case
{
self.synced_peer(&peer_id, remote);
// notify the range sync that a peer has been added
self.range_sync.fully_synced_peer_found(&mut self.network);
} else {
// Add the peer to our RangeSync
self.range_sync
@ -673,10 +668,41 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// Updates the global sync state and logs any changes.
fn update_sync_state(&mut self) {
if let Some((old_state, new_state)) = self.network_globals.update_sync_state() {
let new_state: SyncState = match self.range_sync.state() {
Err(e) => {
debug!(self.log, "Error getting range sync state"; "error" => %e);
return;
}
Ok(state) => match state {
None => {
// no range sync, decide if we are stalled or synced
self.network_globals
.peers
.read()
.synced_peers()
.next()
.map(|_| SyncState::Synced)
.unwrap_or_else(|| SyncState::Stalled)
}
Some((RangeSyncType::Finalized, start_slot, target_slot)) => {
SyncState::SyncingFinalized {
start_slot,
target_slot,
}
}
Some((RangeSyncType::Head, start_slot, target_slot)) => SyncState::SyncingHead {
start_slot,
target_slot,
},
},
};
let old_state = self.network_globals.set_sync_state(new_state);
let new_state = self.network_globals.sync_state.read();
if !new_state.eq(&old_state) {
info!(self.log, "Sync state updated"; "old_state" => %old_state, "new_state" => %new_state);
// If we have become synced - Subscribe to all the core subnet topics
if new_state == eth2_libp2p::types::SyncState::Synced {
if new_state.is_synced() {
self.network.subscribe_core_topics();
}
}

View File

@ -98,6 +98,14 @@ impl<T: EthSpec> BatchInfo<T> {
peers
}
/// Verifies if an incomming block belongs to this batch.
pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &RequestId) -> bool {
if let BatchState::Downloading(expected_peer, _, expected_id) = &self.state {
return peer_id == expected_peer && expected_id == request_id;
}
false
}
pub fn current_peer(&self) -> Option<&PeerId> {
match &self.state {
BatchState::AwaitingDownload | BatchState::Failed => None,

View File

@ -2,14 +2,13 @@ use super::batch::{BatchInfo, BatchState};
use crate::beacon_processor::ProcessId;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::sync::{network_context::SyncNetworkContext, BatchProcessResult, RequestId};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_chain::BeaconChainTypes;
use eth2_libp2p::{PeerAction, PeerId};
use fnv::FnvHashMap;
use rand::seq::SliceRandom;
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
@ -87,9 +86,6 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: Sender<BeaconWorkEvent<T::EthSpec>>,
/// A reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>,
/// The chain's log.
log: slog::Logger,
}
@ -116,7 +112,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
target_head_root: Hash256,
peer_id: PeerId,
beacon_processor_send: Sender<BeaconWorkEvent<T::EthSpec>>,
chain: Arc<BeaconChain<T>>,
log: &slog::Logger,
) -> Self {
let mut peers = FnvHashMap::default();
@ -138,7 +133,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
state: ChainSyncingState::Stopped,
current_processing_batch: None,
beacon_processor_send,
chain,
log: log.new(o!("chain" => id)),
}
}
@ -163,17 +157,17 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
if let Some(batch_ids) = self.peers.remove(peer_id) {
// fail the batches
for id in batch_ids {
if let BatchState::Failed = self
.batches
.get_mut(&id)
.expect("registered batch exists")
.download_failed(&self.log)
{
return ProcessingResult::RemoveChain;
}
if let ProcessingResult::RemoveChain = self.retry_batch_download(network, id) {
// drop the chain early
return ProcessingResult::RemoveChain;
if let Some(batch) = self.batches.get_mut(&id) {
if let BatchState::Failed = batch.download_failed(&self.log) {
return ProcessingResult::RemoveChain;
}
if let ProcessingResult::RemoveChain = self.retry_batch_download(network, id) {
// drop the chain early
return ProcessingResult::RemoveChain;
}
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => "id")
}
}
}
@ -215,12 +209,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
if let BatchState::Downloading(expected_peer, _, expected_request_id) =
batch.state()
{
if expected_peer != peer_id || expected_request_id != &request_id {
return ProcessingResult::KeepChain;
}
if !batch.is_expecting_block(peer_id, &request_id) {
return ProcessingResult::KeepChain;
}
batch
}
@ -275,7 +265,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return ProcessingResult::KeepChain;
}
let batch = self.batches.get_mut(&batch_id).expect("Batch exists");
let batch = match self.batches.get_mut(&batch_id) {
Some(batch) => batch,
None => {
debug!(self.log, "Processing unknown batch"; "batch" => %batch_id);
return ProcessingResult::RemoveChain;
}
};
// NOTE: We send empty batches to the processor in order to trigger the block processor
// result callback. This is done, because an empty batch could end a chain and the logic
@ -340,10 +336,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// - Poisoned -> this is an intermediate state that should never be reached
// - AwaitingDownload -> A recoverable failed batch should have been
// re-requested.
unreachable!(
"Optimistic batch indicates inconsistent chain state: {:?}",
state
)
crit!(self.log, "Optimistic batch indicates inconsistent chain state"; "state" => ?state);
return ProcessingResult::RemoveChain;
}
BatchState::AwaitingValidation(_) => {
// This is possible due to race conditions, and tho it would be considered
@ -352,7 +346,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// candidate. If the batch was empty the chain rejects it; if it was non
// empty the chain is advanced to this point (so that the old optimistic
// batch is now the processing target)
crit!(self.log, "Optimistic batch should never be Awaiting Validation"; "batch" => epoch);
debug!(self.log, "Optimistic batch should never be Awaiting Validation"; "batch" => epoch);
None
}
}
@ -436,10 +430,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
match result {
BatchProcessResult::Success(was_non_empty) => {
let batch = self
.batches
.get_mut(&batch_id)
.expect("Chain was expecting a known batch");
let batch = match self.batches.get_mut(&batch_id) {
Some(batch) => batch,
None => {
debug!(self.log, "Current processing batch not found"; "batch" => batch_id);
return ProcessingResult::RemoveChain;
}
};
let _ = batch.processing_completed(true, &self.log);
// If the processed batch was not empty, we can validate previous unvalidated
// blocks.
@ -479,13 +476,19 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
}
BatchProcessResult::Failed(imported_blocks) => {
let batch = self
.batches
.get_mut(&batch_id)
.expect("Chain was expecting a known batch");
let peer = batch
.current_peer()
.expect("batch is processing blocks from a peer");
let (batch, peer) = match self.batches.get_mut(&batch_id) {
Some(batch) => match batch.current_peer().cloned() {
Some(peer) => (batch, peer),
None => {
debug!(self.log, "Current processing has no peer"; "batch" => batch_id);
return ProcessingResult::RemoveChain;
}
},
None => {
debug!(self.log, "Current processing batch not found"; "batch" => batch_id);
return ProcessingResult::RemoveChain;
}
};
debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks,
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer));
if let BatchState::Failed = batch.processing_completed(false, &self.log) {
@ -610,12 +613,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
active_batches.remove(&id);
}
}
BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => {
unreachable!("batch indicates inconsistent chain state while advancing chain")
}
BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => crit!(
self.log,
"batch indicates inconsistent chain state while advancing chain"
),
BatchState::AwaitingProcessing(..) => {}
BatchState::Processing(_) => {
assert_eq!(
debug_assert_eq!(
id,
self.current_processing_batch.expect(
"A batch in a processing state means the chain is processing it"
@ -770,11 +774,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
}
/// Sends a STATUS message to all peers in the peer pool.
pub fn status_peers(&self, network: &mut SyncNetworkContext<T::EthSpec>) {
network.status_peers(self.chain.clone(), self.peers.keys().cloned());
}
/// An RPC error has occurred.
///
/// If the batch exists it is re-requested.
@ -789,16 +788,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer
if let BatchState::Downloading(expected_peer, _, expected_request_id) = batch.state() {
if expected_peer != peer_id || expected_request_id != &request_id {
return ProcessingResult::KeepChain;
}
if !batch.is_expecting_block(peer_id, &request_id) {
return ProcessingResult::KeepChain;
}
debug!(self.log, "Batch failed. RPC Error"; "batch_epoch" => batch_id);
self.peers
.get_mut(peer_id)
.expect("Peer belongs to the chain")
.remove(&batch_id);
if let Some(active_requests) = self.peers.get_mut(peer_id) {
active_requests.remove(&batch_id);
}
if let BatchState::Failed = batch.download_failed(&self.log) {
return ProcessingResult::RemoveChain;
}
@ -865,11 +861,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
debug!(self.log, "Requesting batch"; "epoch" => batch_id, &batch);
}
// register the batch for this peer
self.peers
return self
.peers
.get_mut(&peer)
.expect("peer belongs to the peer pool")
.insert(batch_id);
return ProcessingResult::KeepChain;
.map(|requests| {
requests.insert(batch_id);
ProcessingResult::KeepChain
})
.unwrap_or(ProcessingResult::RemoveChain);
}
Err(e) => {
// NOTE: under normal conditions this shouldn't happen but we handle it anyway
@ -879,8 +878,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch.start_downloading_from_peer(peer.clone(), 1, &self.log); // fake request_id is not relevant
self.peers
.get_mut(&peer)
.expect("peer belongs to the peer pool")
.remove(&batch_id);
.map(|request| request.remove(&batch_id));
if let BatchState::Failed = batch.download_failed(&self.log) {
return ProcessingResult::RemoveChain;
} else {

View File

@ -3,16 +3,18 @@
//! Each chain type is stored in it's own map. A variety of helper functions are given along with
//! this struct to simplify the logic of the other layers of sync.
use super::chain::{ChainId, ChainSyncingState, ProcessingResult, SyncingChain};
use super::chain::{ChainId, ProcessingResult, SyncingChain};
use super::sync_type::RangeSyncType;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::PeerSyncInfo;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::{types::SyncState, NetworkGlobals, PeerId};
use eth2_libp2p::PeerId;
use fnv::FnvHashMap;
use slog::{crit, debug, error, info, trace};
use slog::{debug, error};
use smallvec::SmallVec;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::EthSpec;
@ -25,66 +27,17 @@ const PARALLEL_HEAD_CHAINS: usize = 2;
#[derive(Clone)]
pub enum RangeSyncState {
/// A finalized chain is being synced.
Finalized {
/// The start of the finalized chain.
start_slot: Slot,
/// The target head slot of the finalized chain.
head_slot: Slot,
/// The target head root of the finalized chain.
head_root: Hash256,
},
Finalized(u64),
/// There are no finalized chains and we are syncing one more head chains.
Head {
/// The last finalized checkpoint for all head chains.
start_slot: Slot,
/// The largest known slot to sync to.
head_slot: Slot,
},
Head(SmallVec<[u64; PARALLEL_HEAD_CHAINS]>),
/// There are no head or finalized chains and no long range sync is in progress.
Idle,
}
impl PartialEq for RangeSyncState {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(RangeSyncState::Finalized { .. }, RangeSyncState::Finalized { .. }) => true,
(RangeSyncState::Head { .. }, RangeSyncState::Head { .. }) => true,
(RangeSyncState::Idle, RangeSyncState::Idle) => true,
_ => false,
}
}
}
impl Into<SyncState> for RangeSyncState {
fn into(self) -> SyncState {
match self {
RangeSyncState::Finalized {
start_slot,
head_slot,
head_root,
} => SyncState::SyncingFinalized {
start_slot,
head_slot,
head_root,
},
RangeSyncState::Head {
start_slot,
head_slot,
} => SyncState::SyncingHead {
start_slot,
head_slot,
},
RangeSyncState::Idle => SyncState::Stalled, // this should never really be used
}
}
}
/// A collection of finalized and head chains currently being processed.
pub struct ChainCollection<T: BeaconChainTypes> {
/// The beacon chain for processing.
beacon_chain: Arc<BeaconChain<T>>,
/// A reference to the global network parameters.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
/// The set of finalized chains being synced.
finalized_chains: FnvHashMap<ChainId, SyncingChain<T>>,
/// The set of head chains being synced.
@ -96,14 +49,9 @@ pub struct ChainCollection<T: BeaconChainTypes> {
}
impl<T: BeaconChainTypes> ChainCollection<T> {
pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
log: slog::Logger,
) -> Self {
pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: slog::Logger) -> Self {
ChainCollection {
beacon_chain,
network_globals,
finalized_chains: FnvHashMap::default(),
head_chains: FnvHashMap::default(),
state: RangeSyncState::Idle,
@ -111,82 +59,55 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
}
}
pub fn state(&self) -> &RangeSyncState {
&self.state
}
/// Updates the global sync state and logs any changes.
pub fn update_sync_state(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
// if there is no range sync occurring, the state is either synced or not based on
// connected peers.
if self.state == RangeSyncState::Idle {
// there is no range sync, let the state of peers determine the global node sync state
let new_state = self
.network_globals
.peers
.read()
.synced_peers()
.next()
.map(|_| SyncState::Synced)
.unwrap_or_else(|| SyncState::Stalled);
let mut peer_state = self.network_globals.sync_state.write();
if new_state != *peer_state {
info!(self.log, "Sync state updated"; "old_state" => %peer_state, "new_state" => %new_state);
if new_state == SyncState::Synced {
network.subscribe_core_topics();
/// Updates the Syncing state of the collection after a chain is removed.
fn on_chain_removed(&mut self, id: &ChainId, was_syncing: bool) {
match self.state {
RangeSyncState::Finalized(ref syncing_id) => {
if syncing_id == id {
// the finalized chain that was syncing was removed
debug_assert!(was_syncing);
let syncing_head_ids: SmallVec<[u64; PARALLEL_HEAD_CHAINS]> = self
.head_chains
.iter()
.filter(|(_id, chain)| chain.is_syncing())
.map(|(id, _)| *id)
.collect();
self.state = if syncing_head_ids.is_empty() {
RangeSyncState::Idle
} else {
RangeSyncState::Head(syncing_head_ids)
};
} else {
debug_assert!(!was_syncing);
}
*peer_state = new_state;
}
} else {
// The state is based on a range sync state, update it
let mut node_sync_state = self.network_globals.sync_state.write();
let new_state: SyncState = self.state.clone().into();
if *node_sync_state != new_state {
// we are updating the state, inform the user
info!(self.log, "Sync state updated"; "old_state" => %node_sync_state, "new_state" => %new_state);
RangeSyncState::Head(ref mut syncing_head_ids) => {
if let Some(index) = syncing_head_ids
.iter()
.enumerate()
.find(|(_, &chain_id)| &chain_id == id)
.map(|(i, _)| i)
{
// a syncing head chain was removed
debug_assert!(was_syncing);
syncing_head_ids.swap_remove(index);
if syncing_head_ids.is_empty() {
self.state = RangeSyncState::Idle;
}
} else {
debug_assert!(!was_syncing);
}
}
*node_sync_state = new_state;
}
}
/// A fully synced peer has joined.
///
/// We could be awaiting a head sync. If we are in the head syncing state, without any head
/// chains, then update the state to idle.
pub fn fully_synced_peer_found(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
if let RangeSyncState::Head { .. } = self.state {
if self.head_chains.is_empty() {
// Update the global network state to either synced or stalled.
self.state = RangeSyncState::Idle;
self.update_sync_state(network);
RangeSyncState::Idle => {
// the removed chain should not be syncing
debug_assert!(!was_syncing)
}
}
}
/// After a finalized chain completes this function is called. It ensures the state is set to
/// `SyncState::Head` indicating we are awaiting new peers to connect before we can consider
/// the state as idle.
pub fn set_head_sync(&mut self) {
if let RangeSyncState::Idle = self.state {
let current_slot = self
.beacon_chain
.head_info()
.map(|info| info.slot)
.unwrap_or_else(|_| Slot::from(0u64));
// NOTE: This will modify the /node/syncing API to show current slot for all fields
// while we update peers to look for new potentially HEAD chains.
let temp_head_state = RangeSyncState::Head {
start_slot: current_slot,
head_slot: current_slot,
};
self.state = temp_head_state;
}
}
/// Calls `func` on every chain of the collection. If the result is
/// `ProcessingResult::RemoveChain`, the chain is removed and returned.
/// NOTE: `func` must not change the syncing state of a chain.
pub fn call_all<F>(&mut self, mut func: F) -> Vec<(SyncingChain<T>, RangeSyncType)>
where
F: FnMut(&mut SyncingChain<T>) -> ProcessingResult,
@ -211,7 +132,9 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
RangeSyncType::Finalized => self.finalized_chains.remove(&id),
RangeSyncType::Head => self.head_chains.remove(&id),
};
results.push((chain.expect("Chain exits"), sync_type));
let chain = chain.expect("Chain exists");
self.on_chain_removed(&id, chain.is_syncing());
results.push((chain, sync_type));
}
results
}
@ -220,6 +143,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
///
/// If the function returns `ProcessingResult::RemoveChain`, the chain is removed and returned.
/// If the chain is found, its syncing type is returned, or an error otherwise.
/// NOTE: `func` should not change the sync state of a chain.
pub fn call_by_id<F>(
&mut self,
id: ChainId,
@ -231,14 +155,18 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
if let Entry::Occupied(mut entry) = self.finalized_chains.entry(id) {
// Search in our finalized chains first
if let ProcessingResult::RemoveChain = func(entry.get_mut()) {
Ok((Some(entry.remove()), RangeSyncType::Finalized))
let chain = entry.remove();
self.on_chain_removed(&id, chain.is_syncing());
Ok((Some(chain), RangeSyncType::Finalized))
} else {
Ok((None, RangeSyncType::Finalized))
}
} else if let Entry::Occupied(mut entry) = self.head_chains.entry(id) {
// Search in our head chains next
if let ProcessingResult::RemoveChain = func(entry.get_mut()) {
Ok((Some(entry.remove()), RangeSyncType::Head))
let chain = entry.remove();
self.on_chain_removed(&id, chain.is_syncing());
Ok((Some(chain), RangeSyncType::Head))
} else {
Ok((None, RangeSyncType::Head))
}
@ -253,7 +181,12 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
/// This removes any out-dated chains, swaps to any higher priority finalized chains and
/// updates the state of the collection. This starts head chains syncing if any are required to
/// do so.
pub fn update(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
pub fn update(
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
awaiting_head_peers: &mut HashMap<PeerId, PeerSyncInfo>,
beacon_processor_send: &mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
) {
let (local_finalized_epoch, local_head_epoch) =
match PeerSyncInfo::from_chain(&self.beacon_chain) {
None => {
@ -270,14 +203,56 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
};
// Remove any outdated finalized/head chains
self.purge_outdated_chains(network);
self.purge_outdated_chains(awaiting_head_peers);
// Choose the best finalized chain if one needs to be selected.
self.update_finalized_chains(network, local_finalized_epoch, local_head_epoch);
if self.finalized_syncing_chain().is_none() {
if !matches!(self.state, RangeSyncState::Finalized(_)) {
// Handle head syncing chains if there are no finalized chains left.
self.update_head_chains(network, local_finalized_epoch, local_head_epoch);
self.update_head_chains(
network,
local_finalized_epoch,
local_head_epoch,
awaiting_head_peers,
beacon_processor_send,
);
}
}
pub fn state(&self) -> Result<Option<(RangeSyncType, Slot /* from */, Slot /* to */)>, String> {
match self.state {
RangeSyncState::Finalized(ref syncing_id) => {
let chain = self
.finalized_chains
.get(syncing_id)
.ok_or(format!("Finalized syncing chain not found: {}", syncing_id))?;
Ok(Some((
RangeSyncType::Finalized,
chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch()),
chain.target_head_slot,
)))
}
RangeSyncState::Head(ref syncing_head_ids) => {
let mut range: Option<(Slot, Slot)> = None;
for id in syncing_head_ids {
let chain = self
.head_chains
.get(id)
.ok_or(format!("Head syncing chain not found: {}", id))?;
range = range.map(|(min_start, max_slot)| {
(
min_start
.min(chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch())),
max_slot.max(chain.target_head_slot),
)
});
}
let (start_slot, target_slot) =
range.ok_or_else(|| "Syncing head with empty head ids".to_string())?;
Ok(Some((RangeSyncType::Head, start_slot, target_slot)))
}
RangeSyncState::Idle => Ok(None),
}
}
@ -290,26 +265,32 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
local_head_epoch: Epoch,
) {
// Find the chain with most peers and check if it is already syncing
if let Some((new_id, peers)) = self
if let Some((mut new_id, peers)) = self
.finalized_chains
.iter()
.max_by_key(|(_, chain)| chain.available_peers())
.map(|(id, chain)| (*id, chain.available_peers()))
{
let old_id = self.finalized_syncing_chain().map(
|(currently_syncing_id, currently_syncing_chain)| {
if *currently_syncing_id != new_id
&& peers > currently_syncing_chain.available_peers()
{
currently_syncing_chain.stop_syncing();
// we stop this chain and start syncing the one with more peers
Some(*currently_syncing_id)
} else {
// the best chain is already the syncing chain, advance it if possible
None
let mut old_id = None;
if let RangeSyncState::Finalized(syncing_id) = self.state {
if syncing_id == new_id {
// best chain is already syncing
old_id = Some(None);
} else {
// chains are different, check that they don't have the same number of peers
if let Some(syncing_chain) = self.finalized_chains.get_mut(&syncing_id) {
if syncing_chain.available_peers() > peers {
syncing_chain.stop_syncing();
old_id = Some(Some(syncing_id));
} else {
// chains have the same number of peers, pick the currently syncing
// chain to avoid unnecesary switchings and try to advance it
new_id = syncing_id;
old_id = Some(None);
}
}
},
);
}
}
let chain = self
.finalized_chains
@ -318,18 +299,15 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
match old_id {
Some(Some(old_id)) => debug!(self.log, "Switching finalized chains";
"old_id" => old_id, &chain),
"old_id" => old_id, &chain),
None => debug!(self.log, "Syncing new chain"; &chain),
Some(None) => trace!(self.log, "Advancing currently syncing chain"),
// this is the same chain. We try to advance it.
Some(None) => {
// this is the same chain. We try to advance it.
}
}
// update the state to a new finalized state
let state = RangeSyncState::Finalized {
start_slot: chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch()),
head_slot: chain.target_head_slot,
head_root: chain.target_head_root,
};
self.state = state;
self.state = RangeSyncState::Finalized(new_id);
if let ProcessingResult::RemoveChain =
chain.start_syncing(network, local_epoch, local_head_epoch)
@ -337,6 +315,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
// this happens only if sending a batch over the `network` fails a lot
error!(self.log, "Chain removed while switching chains");
self.finalized_chains.remove(&new_id);
self.on_chain_removed(&new_id, true);
}
}
}
@ -347,23 +326,47 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
network: &mut SyncNetworkContext<T::EthSpec>,
local_epoch: Epoch,
local_head_epoch: Epoch,
awaiting_head_peers: &mut HashMap<PeerId, PeerSyncInfo>,
beacon_processor_send: &mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
) {
// There are no finalized chains, update the state.
// Include the awaiting head peers
for (peer_id, peer_sync_info) in awaiting_head_peers.drain() {
self.add_peer_or_create_chain(
local_epoch,
peer_sync_info.head_root,
peer_sync_info.head_slot,
peer_id,
RangeSyncType::Head,
beacon_processor_send,
network,
);
}
if self.head_chains.is_empty() {
// There are no finalized chains, update the state.
self.state = RangeSyncState::Idle;
return;
}
let mut currently_syncing = self
.head_chains
.values()
.filter(|chain| chain.is_syncing())
.count();
let mut not_syncing = self.head_chains.len() - currently_syncing;
// NOTE: if switching from Head Syncing to Finalized Syncing, the head chains are allowed
// to continue, so we check for such chains first, and allow them to continue.
let mut syncing_chains = SmallVec::<[u64; PARALLEL_HEAD_CHAINS]>::new();
for (id, chain) in self.head_chains.iter_mut() {
if chain.is_syncing() {
if syncing_chains.len() < PARALLEL_HEAD_CHAINS {
syncing_chains.push(*id);
} else {
chain.stop_syncing();
debug!(self.log, "Stopping extra head chain"; "chain" => id);
}
}
}
let mut not_syncing = self.head_chains.len() - syncing_chains.len();
// Find all head chains that are not currently syncing ordered by peer count.
while currently_syncing <= PARALLEL_HEAD_CHAINS && not_syncing > 0 {
while syncing_chains.len() < PARALLEL_HEAD_CHAINS && not_syncing > 0 {
// Find the chain with the most peers and start syncing
if let Some((_id, chain)) = self
if let Some((id, chain)) = self
.head_chains
.iter_mut()
.filter(|(_id, chain)| !chain.is_syncing())
@ -374,53 +377,21 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
if let ProcessingResult::RemoveChain =
chain.start_syncing(network, local_epoch, local_head_epoch)
{
error!(self.log, "Chain removed while switching head chains")
let id = *id;
self.head_chains.remove(&id);
error!(self.log, "Chain removed while switching head chains"; "id" => id);
} else {
syncing_chains.push(*id);
}
}
// update variables
currently_syncing = self
.head_chains
.iter()
.filter(|(_id, chain)| chain.is_syncing())
.count();
not_syncing = self.head_chains.len() - currently_syncing;
not_syncing = self.head_chains.len() - syncing_chains.len();
}
// Start
// for the syncing API, we find the minimal start_slot and the maximum
// target_slot of all head chains to report back.
let (min_epoch, max_slot) = self
.head_chains
.values()
.filter(|chain| chain.is_syncing())
.fold(
(Epoch::from(0u64), Slot::from(0u64)),
|(min, max), chain| {
(
std::cmp::min(min, chain.start_epoch),
std::cmp::max(max, chain.target_head_slot),
)
},
);
let head_state = RangeSyncState::Head {
start_slot: min_epoch.start_slot(T::EthSpec::slots_per_epoch()),
head_slot: max_slot,
self.state = if syncing_chains.is_empty() {
RangeSyncState::Idle
} else {
RangeSyncState::Head(syncing_chains)
};
self.state = head_state;
}
/// This is called once a head chain has completed syncing. It removes all non-syncing head
/// chains and re-status their peers.
pub fn clear_head_chains(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
let log_ref = &self.log;
self.head_chains.retain(|_id, chain| {
if !chain.is_syncing() {
debug!(log_ref, "Removing old head chain"; &chain);
chain.status_peers(network);
false
} else {
true
}
});
}
/// Returns if `true` if any finalized chains exist, `false` otherwise.
@ -430,14 +401,11 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
/// Removes any outdated finalized or head chains.
/// This removes chains with no peers, or chains whose start block slot is less than our current
/// finalized block slot.
pub fn purge_outdated_chains(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
// Remove any chains that have no peers
self.finalized_chains
.retain(|_id, chain| chain.available_peers() > 0);
self.head_chains
.retain(|_id, chain| chain.available_peers() > 0);
/// finalized block slot. Peers that would create outdated chains are removed too.
pub fn purge_outdated_chains(
&mut self,
awaiting_head_peers: &mut HashMap<PeerId, PeerSyncInfo>,
) {
let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) {
Some(local) => local,
None => {
@ -455,39 +423,50 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
let beacon_chain = &self.beacon_chain;
let log_ref = &self.log;
// Remove chains that are out-dated and re-status their peers
self.finalized_chains.retain(|_id, chain| {
if chain.target_head_slot <= local_finalized_slot
|| beacon_chain
.fork_choice
.read()
.contains_block(&chain.target_head_root)
let is_outdated = |target_slot: &Slot, target_root: &Hash256| {
target_slot <= &local_finalized_slot
|| beacon_chain.fork_choice.read().contains_block(target_root)
};
// Retain only head peers that remain relevant
awaiting_head_peers.retain(|_peer_id, peer_sync_info| {
!is_outdated(&peer_sync_info.head_slot, &peer_sync_info.head_root)
});
// Remove chains that are out-dated
let mut removed_chains = Vec::new();
self.finalized_chains.retain(|id, chain| {
if is_outdated(&chain.target_head_slot, &chain.target_head_root)
|| chain.available_peers() == 0
{
debug!(log_ref, "Purging out of finalized chain"; &chain);
chain.status_peers(network);
removed_chains.push((*id, chain.is_syncing()));
false
} else {
true
}
});
self.head_chains.retain(|_id, chain| {
if chain.target_head_slot <= local_finalized_slot
|| beacon_chain
.fork_choice
.read()
.contains_block(&chain.target_head_root)
self.head_chains.retain(|id, chain| {
if is_outdated(&chain.target_head_slot, &chain.target_head_root)
|| chain.available_peers() == 0
{
debug!(log_ref, "Purging out of date head chain"; &chain);
chain.status_peers(network);
removed_chains.push((*id, chain.is_syncing()));
false
} else {
true
}
});
// update the state of the collection
for (id, was_syncing) in removed_chains {
self.on_chain_removed(&id, was_syncing);
}
}
/// Adds a peer to a chain with the given target, or creates a new syncing chain if it doesn't
/// exits.
/// exists.
#[allow(clippy::too_many_arguments)]
pub fn add_peer_or_create_chain(
&mut self,
@ -501,27 +480,20 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
) {
let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot);
let collection = if let RangeSyncType::Finalized = sync_type {
if let Some(chain) = self.head_chains.get(&id) {
// sanity verification for chain duplication / purging issues
crit!(self.log, "Adding known head chain as finalized chain"; chain);
}
&mut self.finalized_chains
} else {
if let Some(chain) = self.finalized_chains.get(&id) {
// sanity verification for chain duplication / purging issues
crit!(self.log, "Adding known finalized chain as head chain"; chain);
}
&mut self.head_chains
};
match collection.entry(id) {
Entry::Occupied(mut entry) => {
let chain = entry.get_mut();
debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain);
assert_eq!(chain.target_head_root, target_head_root);
assert_eq!(chain.target_head_slot, target_head_slot);
debug_assert_eq!(chain.target_head_root, target_head_root);
debug_assert_eq!(chain.target_head_slot, target_head_slot);
if let ProcessingResult::RemoveChain = chain.add_peer(network, peer) {
debug!(self.log, "Chain removed after adding peer"; "chain" => id);
entry.remove();
let chain = entry.remove();
self.on_chain_removed(&id, chain.is_syncing());
}
}
Entry::Vacant(entry) => {
@ -532,25 +504,12 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
target_head_root,
peer,
beacon_processor_send.clone(),
self.beacon_chain.clone(),
&self.log,
);
assert_eq!(new_chain.get_id(), id);
debug_assert_eq!(new_chain.get_id(), id);
debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain);
entry.insert(new_chain);
}
}
}
/// Returns the index of finalized chain that is currently syncing. Returns `None` if no
/// finalized chain is currently syncing.
fn finalized_syncing_chain(&mut self) -> Option<(&ChainId, &mut SyncingChain<T>)> {
self.finalized_chains.iter_mut().find_map(|(id, chain)| {
if chain.state == ChainSyncingState::Syncing {
Some((id, chain))
} else {
None
}
})
}
}

View File

@ -10,3 +10,4 @@ mod sync_type;
pub use batch::BatchInfo;
pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH};
pub use range::RangeSync;
pub use sync_type::RangeSyncType;

View File

@ -40,7 +40,7 @@
//! and further batches are requested as current blocks are being processed.
use super::chain::ChainId;
use super::chain_collection::{ChainCollection, RangeSyncState};
use super::chain_collection::ChainCollection;
use super::sync_type::RangeSyncType;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::sync::network_context::SyncNetworkContext;
@ -48,12 +48,12 @@ use crate::sync::BatchProcessResult;
use crate::sync::PeerSyncInfo;
use crate::sync::RequestId;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::{NetworkGlobals, PeerId};
use slog::{debug, error, trace, warn};
use std::collections::HashSet;
use eth2_libp2p::PeerId;
use slog::{debug, error, trace};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{Epoch, EthSpec, SignedBeaconBlock};
use types::{Epoch, EthSpec, SignedBeaconBlock, Slot};
/// The primary object dealing with long range/batch syncing. This contains all the active and
/// non-active chains that need to be processed before the syncing is considered complete. This
@ -61,13 +61,12 @@ use types::{Epoch, EthSpec, SignedBeaconBlock};
pub struct RangeSync<T: BeaconChainTypes> {
/// The beacon chain for processing.
beacon_chain: Arc<BeaconChain<T>>,
/// Last known sync info of our useful connected peers. We use this information to create Head
/// chains after all finalized chains have ended.
awaiting_head_peers: HashMap<PeerId, PeerSyncInfo>,
/// A collection of chains that need to be downloaded. This stores any head or finalized chains
/// that need to be downloaded.
chains: ChainCollection<T>,
/// Peers that join whilst a finalized chain is being downloaded, sit in this set. Once the
/// finalized chain(s) complete, these peer's get STATUS'ed to update their head slot before
/// the head chains are formed and downloaded.
awaiting_head_peers: HashSet<PeerId>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
/// The syncing logger.
@ -77,29 +76,20 @@ pub struct RangeSync<T: BeaconChainTypes> {
impl<T: BeaconChainTypes> RangeSync<T> {
pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
log: slog::Logger,
) -> Self {
RangeSync {
beacon_chain: beacon_chain.clone(),
chains: ChainCollection::new(beacon_chain, network_globals, log.clone()),
awaiting_head_peers: HashSet::new(),
chains: ChainCollection::new(beacon_chain, log.clone()),
awaiting_head_peers: HashMap::new(),
beacon_processor_send,
log,
}
}
/// The `chains` collection stores the current state of syncing. Once a finalized chain
/// completes, it's state is pre-emptively set to `SyncState::Head`. This ensures that
/// during the transition period of finalized to head, the sync manager doesn't start
/// requesting blocks from gossipsub.
///
/// On re-status, a peer that has no head to download indicates that this state can be set to
/// idle as there are in fact no head chains to download. This function notifies the chain
/// collection that the state can safely be set to idle.
pub fn fully_synced_peer_found(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
self.chains.fully_synced_peer_found(network)
pub fn state(&self) -> Result<Option<(RangeSyncType, Slot /* from */, Slot /* to */)>, String> {
self.chains.state()
}
/// A useful peer has been added. The SyncManager has identified this peer as needing either
@ -133,16 +123,11 @@ impl<T: BeaconChainTypes> RangeSync<T> {
// NOTE: A peer that has been re-status'd may now exist in multiple finalized chains.
// remove any out-of-date chains
self.chains.purge_outdated_chains(network);
// determine which kind of sync to perform and set up the chains
match RangeSyncType::new(&self.beacon_chain, &local_info, &remote_info) {
RangeSyncType::Finalized => {
// Finalized chain search
debug!(self.log, "Finalization sync peer joined"; "peer_id" => %peer_id);
// remove the peer from the awaiting_head_peers list if it exists
self.awaiting_head_peers.remove(&peer_id);
// Note: We keep current head chains. These can continue syncing whilst we complete
@ -158,24 +143,27 @@ impl<T: BeaconChainTypes> RangeSync<T> {
network,
);
self.chains.update(network);
// update the global sync state
self.chains.update_sync_state(network);
self.chains.update(
network,
&mut self.awaiting_head_peers,
&self.beacon_processor_send,
);
}
RangeSyncType::Head => {
// This peer requires a head chain sync
if self.chains.is_finalizing_sync() {
// If there are finalized chains to sync, finish these first, before syncing head
// chains. This allows us to re-sync all known peers
trace!(self.log, "Waiting for finalized sync to complete"; "peer_id" => %peer_id);
// store the peer to re-status after all finalized chains complete
self.awaiting_head_peers.insert(peer_id);
// chains.
trace!(self.log, "Waiting for finalized sync to complete";
"peer_id" => %peer_id, "awaiting_head_peers" => &self.awaiting_head_peers.len());
self.awaiting_head_peers.insert(peer_id, remote_info);
return;
}
// if the peer existed in any other head chain, remove it.
self.remove_peer(network, &peer_id);
self.awaiting_head_peers.remove(&peer_id);
// The new peer has the same finalized (earlier filters should prevent a peer with an
// earlier finalized chain from reaching here).
@ -191,8 +179,11 @@ impl<T: BeaconChainTypes> RangeSync<T> {
&self.beacon_processor_send,
network,
);
self.chains.update(network);
self.chains.update_sync_state(network);
self.chains.update(
network,
&mut self.awaiting_head_peers,
&self.beacon_processor_send,
);
}
}
}
@ -217,13 +208,14 @@ impl<T: BeaconChainTypes> RangeSync<T> {
chain.on_block_response(network, batch_id, &peer_id, request_id, beacon_block)
}) {
Ok((removed_chain, sync_type)) => {
if let Some(removed_chain) = removed_chain {
if let Some(_removed_chain) = removed_chain {
debug!(self.log, "Chain removed after block response"; "sync_type" => ?sync_type, "chain_id" => chain_id);
removed_chain.status_peers(network);
// update the state of the collection
self.chains.update(network);
// update the global state and inform the user
self.chains.update_sync_state(network);
self.chains.update(
network,
&mut self.awaiting_head_peers,
&self.beacon_processor_send,
);
}
}
Err(_) => {
@ -249,42 +241,13 @@ impl<T: BeaconChainTypes> RangeSync<T> {
Ok((None, _sync_type)) => {
// Chain was found and not removed
}
Ok((Some(removed_chain), sync_type)) => {
Ok((Some(_removed_chain), sync_type)) => {
debug!(self.log, "Chain removed after processing result"; "chain" => chain_id, "sync_type" => ?sync_type);
// Chain ended, re-status its peers
removed_chain.status_peers(network);
match sync_type {
RangeSyncType::Finalized => {
// update the state of the collection
self.chains.update(network);
// set the state to a head sync if there are no finalized chains, to inform
// the manager that we are awaiting a head chain.
self.chains.set_head_sync();
// Update the global variables
self.chains.update_sync_state(network);
// if there are no more finalized chains, re-status all known peers
// awaiting a head sync
match self.chains.state() {
RangeSyncState::Idle | RangeSyncState::Head { .. } => {
network.status_peers(
self.beacon_chain.clone(),
self.awaiting_head_peers.drain(),
);
}
RangeSyncState::Finalized { .. } => {} // Have more finalized chains to complete
}
}
RangeSyncType::Head => {
// Remove non-syncing head chains and re-status the peers. This removes a
// build-up of potentially duplicate head chains. Any legitimate head
// chains will be re-established
self.chains.clear_head_chains(network);
// update the state of the collection
self.chains.update(network);
// update the global state and log any change
self.chains.update_sync_state(network);
}
}
self.chains.update(
network,
&mut self.awaiting_head_peers,
&self.beacon_processor_send,
);
}
Err(_) => {
@ -305,17 +268,12 @@ impl<T: BeaconChainTypes> RangeSync<T> {
// remove the peer from any peer pool, failing its batches
self.remove_peer(network, peer_id);
// update the state of the collection
self.chains.update(network);
// update the global state and inform the user
self.chains.update_sync_state(network);
}
/// When a peer gets removed, both the head and finalized chains need to be searched to check
/// which pool the peer is in. The chain may also have a batch or batches awaiting
/// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum
/// retries. In this case, we need to remove the chain and re-status all the peers.
/// retries. In this case, we need to remove the chain.
fn remove_peer(&mut self, network: &mut SyncNetworkContext<T::EthSpec>, peer_id: &PeerId) {
for (removed_chain, sync_type) in self
.chains
@ -323,10 +281,12 @@ impl<T: BeaconChainTypes> RangeSync<T> {
{
debug!(self.log, "Chain removed after removing peer"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id());
// update the state of the collection
self.chains.update(network);
// update the global state and inform the user
self.chains.update_sync_state(network);
}
self.chains.update(
network,
&mut self.awaiting_head_peers,
&self.beacon_processor_send,
);
}
/// An RPC error has occurred.
@ -348,11 +308,12 @@ impl<T: BeaconChainTypes> RangeSync<T> {
Ok((removed_chain, sync_type)) => {
if let Some(removed_chain) = removed_chain {
debug!(self.log, "Chain removed on rpc error"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id());
removed_chain.status_peers(network);
// update the state of the collection
self.chains.update(network);
// update the global state and inform the user
self.chains.update_sync_state(network);
self.chains.update(
network,
&mut self.awaiting_head_peers,
&self.beacon_processor_send,
);
}
}
Err(_) => {
@ -360,7 +321,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
}
}
} else {
warn!(self.log, "Response/Error for non registered request"; "request_id" => request_id)
debug!(self.log, "Response/Error for non registered request"; "request_id" => request_id)
}
}
}