Correctly update range status when outdated chains are removed (#2827)

We were batch removing chains when purging, and then updating the status of the collection for each of those. This makes the range status be out of sync with the real status. This represented no harm to the global sync status, but I've changed it to comply with a correct debug assertion that I got triggered while doing some testing.
Also added tests and improved code quality as per @paulhauner 's suggestions.
This commit is contained in:
Divma 2021-11-26 01:13:49 +00:00
parent 9eedb6b888
commit 413b0b5b2b
5 changed files with 94 additions and 45 deletions

View File

@ -1,5 +1,3 @@
use std::sync::Arc;
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use lighthouse_network::rpc::StatusMessage;
@ -25,9 +23,3 @@ impl<T: BeaconChainTypes> ToStatusMessage for BeaconChain<T> {
})
}
}
impl<T: BeaconChainTypes> ToStatusMessage for Arc<BeaconChain<T>> {
fn status_message(&self) -> Result<StatusMessage, BeaconChainError> {
<BeaconChain<T> as ToStatusMessage>::status_message(self)
}
}

View File

@ -62,10 +62,10 @@ impl<T: EthSpec> SyncNetworkContext<T> {
pub fn status_peers<C: ToStatusMessage>(
&mut self,
chain: C,
chain: &C,
peers: impl Iterator<Item = PeerId>,
) {
if let Ok(status_message) = &chain.status_message() {
if let Ok(status_message) = chain.status_message() {
for peer_id in peers {
debug!(
self.log,

View File

@ -1,5 +1,3 @@
use std::sync::Arc;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use types::Hash256;
@ -8,7 +6,7 @@ pub trait BlockStorage {
fn is_block_known(&self, block_root: &Hash256) -> bool;
}
impl<T: BeaconChainTypes> BlockStorage for Arc<BeaconChain<T>> {
impl<T: BeaconChainTypes> BlockStorage for BeaconChain<T> {
fn is_block_known(&self, block_root: &Hash256) -> bool {
self.fork_choice.read().contains_block(block_root)
}

View File

@ -17,6 +17,7 @@ use slog::{crit, debug, error};
use smallvec::SmallVec;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::EthSpec;
use types::{Epoch, Hash256, Slot};
@ -41,7 +42,7 @@ pub enum RangeSyncState {
/// A collection of finalized and head chains currently being processed.
pub struct ChainCollection<T: BeaconChainTypes, C> {
/// The beacon chain for processing.
beacon_chain: C,
beacon_chain: Arc<C>,
/// The set of finalized chains being synced.
finalized_chains: FnvHashMap<ChainId, SyncingChain<T>>,
/// The set of head chains being synced.
@ -53,7 +54,7 @@ pub struct ChainCollection<T: BeaconChainTypes, C> {
}
impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
pub fn new(beacon_chain: C, log: slog::Logger) -> Self {
pub fn new(beacon_chain: Arc<C>, log: slog::Logger) -> Self {
ChainCollection {
beacon_chain,
finalized_chains: FnvHashMap::default(),
@ -406,6 +407,7 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
local_info: &SyncInfo,
awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>,
) {
debug!(self.log, "Purging chains");
let local_finalized_slot = local_info
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
@ -414,7 +416,10 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
let log_ref = &self.log;
let is_outdated = |target_slot: &Slot, target_root: &Hash256| {
target_slot <= &local_finalized_slot || beacon_chain.is_block_known(target_root)
let is =
target_slot <= &local_finalized_slot || beacon_chain.is_block_known(target_root);
debug!(log_ref, "Chain is outdated {}", is);
is
};
// Retain only head peers that remain relevant
@ -424,31 +429,35 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
// Remove chains that are out-dated
let mut removed_chains = Vec::new();
self.finalized_chains.retain(|id, chain| {
removed_chains.extend(self.finalized_chains.iter().filter_map(|(id, chain)| {
if is_outdated(&chain.target_head_slot, &chain.target_head_root)
|| chain.available_peers() == 0
{
debug!(log_ref, "Purging out of finalized chain"; &chain);
removed_chains.push((*id, chain.is_syncing(), RangeSyncType::Finalized));
false
Some((*id, chain.is_syncing(), RangeSyncType::Finalized))
} else {
true
None
}
});
self.head_chains.retain(|id, chain| {
}));
removed_chains.extend(self.head_chains.iter().filter_map(|(id, chain)| {
if is_outdated(&chain.target_head_slot, &chain.target_head_root)
|| chain.available_peers() == 0
{
debug!(log_ref, "Purging out of date head chain"; &chain);
removed_chains.push((*id, chain.is_syncing(), RangeSyncType::Head));
false
Some((*id, chain.is_syncing(), RangeSyncType::Head))
} else {
true
None
}
});
}));
// update the state of the collection
for (id, was_syncing, sync_type) in removed_chains {
// remove each chain, updating the state for each removal.
match sync_type {
RangeSyncType::Finalized => self.finalized_chains.remove(&id),
RangeSyncType::Head => self.head_chains.remove(&id),
};
self.on_chain_removed(&id, was_syncing, sync_type);
}
}

View File

@ -59,9 +59,9 @@ use types::{Epoch, EthSpec, SignedBeaconBlock, Slot};
/// The primary object dealing with long range/batch syncing. This contains all the active and
/// non-active chains that need to be processed before the syncing is considered complete. This
/// holds the current state of the long range sync.
pub struct RangeSync<T: BeaconChainTypes, C = Arc<BeaconChain<T>>> {
pub struct RangeSync<T: BeaconChainTypes, C = BeaconChain<T>> {
/// The beacon chain for processing.
beacon_chain: C,
beacon_chain: Arc<C>,
/// Last known sync info of our useful connected peers. We use this information to create Head
/// chains after all finalized chains have ended.
awaiting_head_peers: HashMap<PeerId, SyncInfo>,
@ -76,11 +76,11 @@ pub struct RangeSync<T: BeaconChainTypes, C = Arc<BeaconChain<T>>> {
impl<T: BeaconChainTypes, C> RangeSync<T, C>
where
C: BlockStorage + Clone + ToStatusMessage,
C: BlockStorage + ToStatusMessage,
T: BeaconChainTypes,
{
pub fn new(
beacon_chain: C,
beacon_chain: Arc<C>,
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
log: slog::Logger,
) -> Self {
@ -125,7 +125,7 @@ where
// is OK since we since only one finalized chain at a time.
// determine which kind of sync to perform and set up the chains
match RangeSyncType::new(&self.beacon_chain, &local_info, &remote_info) {
match RangeSyncType::new(self.beacon_chain.as_ref(), &local_info, &remote_info) {
RangeSyncType::Finalized => {
// Finalized chain search
debug!(self.log, "Finalization sync peer joined"; "peer_id" => %peer_id);
@ -337,7 +337,7 @@ where
debug!(self.log, "Chain removed"; "sync_type" => ?sync_type, &chain, "reason" => ?remove_reason, "op" => op);
}
network.status_peers(self.beacon_chain.clone(), chain.peers());
network.status_peers(self.beacon_chain.as_ref(), chain.peers());
let local = match self.beacon_chain.status_message() {
Ok(status) => SyncInfo {
@ -376,21 +376,21 @@ mod tests {
use slog::{o, Drain};
use slot_clock::SystemTimeSlotClock;
use std::sync::atomic::AtomicBool;
use std::collections::HashSet;
use std::sync::Arc;
use store::MemoryStore;
use types::{Hash256, MinimalEthSpec as E};
#[derive(Debug)]
struct FakeStorage {
is_block_known: AtomicBool,
known_blocks: RwLock<HashSet<Hash256>>,
status: RwLock<StatusMessage>,
}
impl Default for FakeStorage {
fn default() -> Self {
FakeStorage {
is_block_known: AtomicBool::new(false),
known_blocks: RwLock::new(HashSet::new()),
status: RwLock::new(StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::zero(),
@ -402,14 +402,24 @@ mod tests {
}
}
impl BlockStorage for Arc<FakeStorage> {
fn is_block_known(&self, _block_root: &store::Hash256) -> bool {
self.is_block_known
.load(std::sync::atomic::Ordering::Relaxed)
impl FakeStorage {
fn remember_block(&self, block_root: Hash256) {
self.known_blocks.write().insert(block_root);
}
#[allow(dead_code)]
fn forget_block(&self, block_root: &Hash256) {
self.known_blocks.write().remove(block_root);
}
}
impl ToStatusMessage for Arc<FakeStorage> {
impl BlockStorage for FakeStorage {
fn is_block_known(&self, block_root: &store::Hash256) -> bool {
self.known_blocks.read().contains(block_root)
}
}
impl ToStatusMessage for FakeStorage {
fn status_message(&self) -> Result<StatusMessage, beacon_chain::BeaconChainError> {
Ok(self.status.read().clone())
}
@ -446,7 +456,7 @@ mod tests {
globals: Arc<NetworkGlobals<E>>,
}
impl RangeSync<TestBeaconChainType, Arc<FakeStorage>> {
impl RangeSync<TestBeaconChainType, FakeStorage> {
fn assert_state(&self, expected_state: RangeSyncType) {
assert_eq!(
self.state()
@ -456,6 +466,14 @@ mod tests {
expected_state
)
}
#[allow(dead_code)]
fn assert_not_syncing(&self) {
assert!(
self.state().expect("State is ok").is_none(),
"Range should not be syncing."
);
}
}
impl TestRig {
@ -525,7 +543,7 @@ mod tests {
let local_info = self.local_info();
let finalized_root = Hash256::random();
let finalized_epoch = local_info.finalized_epoch + 1;
let finalized_epoch = local_info.finalized_epoch + 2;
let head_slot = finalized_epoch.start_slot(E::slots_per_epoch());
let head_root = Hash256::random();
let remote_info = SyncInfo {
@ -540,11 +558,11 @@ mod tests {
}
}
fn range(log_enabled: bool) -> (TestRig, RangeSync<TestBeaconChainType, Arc<FakeStorage>>) {
fn range(log_enabled: bool) -> (TestRig, RangeSync<TestBeaconChainType, FakeStorage>) {
let chain = Arc::new(FakeStorage::default());
let log = build_log(slog::Level::Trace, log_enabled);
let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(10);
let range_sync = RangeSync::<TestBeaconChainType, Arc<FakeStorage>>::new(
let range_sync = RangeSync::<TestBeaconChainType, FakeStorage>::new(
chain.clone(),
beacon_processor_tx,
log.new(o!("component" => "range")),
@ -592,7 +610,7 @@ mod tests {
#[test]
fn head_chain_removed_while_finalized_syncing() {
// NOTE: this is a regression test.
let (mut rig, mut range) = range(true);
let (mut rig, mut range) = range(false);
// Get a peer with an advanced head
let (head_peer, local_info, remote_info) = rig.head_peer();
@ -614,4 +632,36 @@ mod tests {
range.remove_peer(&mut rig.cx, &head_peer);
range.assert_state(RangeSyncType::Finalized);
}
#[test]
fn state_update_while_purging() {
// NOTE: this is a regression test.
let (mut rig, mut range) = range(true);
// Get a peer with an advanced head
let (head_peer, local_info, head_info) = rig.head_peer();
let head_peer_root = head_info.head_root;
range.add_peer(&mut rig.cx, local_info, head_peer, head_info);
range.assert_state(RangeSyncType::Head);
// Sync should have requested a batch, grab the request.
let _request = rig.grab_request(&head_peer);
// Now get a peer with an advanced finalized epoch.
let (finalized_peer, local_info, remote_info) = rig.finalized_peer();
let finalized_peer_root = remote_info.finalized_root;
range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info);
range.assert_state(RangeSyncType::Finalized);
// Sync should have requested a batch, grab the request
let _second_request = rig.grab_request(&finalized_peer);
// Now the chain knows both chains target roots.
rig.chain.remember_block(head_peer_root);
rig.chain.remember_block(finalized_peer_root);
// Add an additional peer to the second chain to make range update it's status
let (finalized_peer, local_info, remote_info) = rig.finalized_peer();
range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info);
}
}