From 31386277c3bd7966fcf97789ef2b4834e5452af9 Mon Sep 17 00:00:00 2001 From: Divma Date: Fri, 19 Nov 2021 02:38:25 +0000 Subject: [PATCH] Sync wrong dbg assertion (#2821) ## Issue Addressed Running a beacon node I triggered a sync debug panic. And so finally the time to create tests for sync arrived. Fortunately, te bug was not in the sync algorithm itself but a wrong assertion ## Proposed Changes - Split Range's impl from the BeaconChain via a trait. This is needed for testing. The TestingRig/Harness is way bigger than needed and does not provide the modification functionalities that are needed to test sync. I find this simpler, tho some could disagree. - Add a regression test for sync that fails before the changes. - Fix the wrong assertion. --- beacon_node/network/src/status.rs | 8 + .../network/src/sync/network_context.rs | 5 +- .../src/sync/range_sync/block_storage.rs | 15 + .../src/sync/range_sync/chain_collection.rs | 20 +- .../network/src/sync/range_sync/mod.rs | 1 + .../network/src/sync/range_sync/range.rs | 269 +++++++++++++++++- .../network/src/sync/range_sync/sync_type.rs | 15 +- 7 files changed, 306 insertions(+), 27 deletions(-) create mode 100644 beacon_node/network/src/sync/range_sync/block_storage.rs diff --git a/beacon_node/network/src/status.rs b/beacon_node/network/src/status.rs index ade490e00..fa52fddc3 100644 --- a/beacon_node/network/src/status.rs +++ b/beacon_node/network/src/status.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use lighthouse_network::rpc::StatusMessage; @@ -23,3 +25,9 @@ impl ToStatusMessage for BeaconChain { }) } } + +impl ToStatusMessage for Arc> { + fn status_message(&self) -> Result { + as ToStatusMessage>::status_message(self) + } +} diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index cedc2f036..e8b67ba92 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -6,7 +6,6 @@ use super::range_sync::{BatchId, ChainId}; use super::RequestId as SyncRequestId; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; -use beacon_chain::{BeaconChain, BeaconChainTypes}; use fnv::FnvHashMap; use lighthouse_network::rpc::{ BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId, @@ -61,9 +60,9 @@ impl SyncNetworkContext { .unwrap_or_default() } - pub fn status_peers( + pub fn status_peers( &mut self, - chain: Arc>, + chain: C, peers: impl Iterator, ) { if let Ok(status_message) = &chain.status_message() { diff --git a/beacon_node/network/src/sync/range_sync/block_storage.rs b/beacon_node/network/src/sync/range_sync/block_storage.rs new file mode 100644 index 000000000..5590ac623 --- /dev/null +++ b/beacon_node/network/src/sync/range_sync/block_storage.rs @@ -0,0 +1,15 @@ +use std::sync::Arc; + +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use types::Hash256; + +/// Trait that helps maintain RangeSync's implementation split from the BeaconChain +pub trait BlockStorage { + fn is_block_known(&self, block_root: &Hash256) -> bool; +} + +impl BlockStorage for Arc> { + fn is_block_known(&self, block_root: &Hash256) -> bool { + self.fork_choice.read().contains_block(block_root) + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index d75e18e98..4dc9c1d01 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -3,12 +3,13 @@ //! Each chain type is stored in it's own map. A variety of helper functions are given along with //! this struct to simplify the logic of the other layers of sync. +use super::block_storage::BlockStorage; use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::metrics; use crate::sync::network_context::SyncNetworkContext; -use beacon_chain::{BeaconChain, BeaconChainTypes}; +use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; use lighthouse_network::PeerId; use lighthouse_network::SyncInfo; @@ -16,7 +17,6 @@ use slog::{crit, debug, error}; use smallvec::SmallVec; use std::collections::hash_map::Entry; use std::collections::HashMap; -use std::sync::Arc; use tokio::sync::mpsc; use types::EthSpec; use types::{Epoch, Hash256, Slot}; @@ -39,9 +39,9 @@ pub enum RangeSyncState { } /// A collection of finalized and head chains currently being processed. -pub struct ChainCollection { +pub struct ChainCollection { /// The beacon chain for processing. - beacon_chain: Arc>, + beacon_chain: C, /// The set of finalized chains being synced. finalized_chains: FnvHashMap>, /// The set of head chains being synced. @@ -52,8 +52,8 @@ pub struct ChainCollection { log: slog::Logger, } -impl ChainCollection { - pub fn new(beacon_chain: Arc>, log: slog::Logger) -> Self { +impl ChainCollection { + pub fn new(beacon_chain: C, log: slog::Logger) -> Self { ChainCollection { beacon_chain, finalized_chains: FnvHashMap::default(), @@ -72,7 +72,7 @@ impl ChainCollection { RangeSyncState::Finalized(ref syncing_id) => { if syncing_id == id { // the finalized chain that was syncing was removed - debug_assert!(was_syncing); + debug_assert!(was_syncing && sync_type == RangeSyncType::Finalized); let syncing_head_ids: SmallVec<[u64; PARALLEL_HEAD_CHAINS]> = self .head_chains .iter() @@ -85,7 +85,8 @@ impl ChainCollection { RangeSyncState::Head(syncing_head_ids) }; } else { - debug_assert!(!was_syncing); + // we removed a head chain, or an stoped finalized chain + debug_assert!(!was_syncing || sync_type != RangeSyncType::Finalized); } } RangeSyncState::Head(ref mut syncing_head_ids) => { @@ -413,8 +414,7 @@ impl ChainCollection { let log_ref = &self.log; let is_outdated = |target_slot: &Slot, target_root: &Hash256| { - target_slot <= &local_finalized_slot - || beacon_chain.fork_choice.read().contains_block(target_root) + target_slot <= &local_finalized_slot || beacon_chain.is_block_known(target_root) }; // Retain only head peers that remain relevant diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index a8d18b8c8..b4a27c23c 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -2,6 +2,7 @@ //! peers. mod batch; +mod block_storage; mod chain; mod chain_collection; mod range; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 0663ad266..2786ef410 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -39,6 +39,7 @@ //! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially //! and further batches are requested as current blocks are being processed. +use super::block_storage::BlockStorage; use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; use super::chain_collection::ChainCollection; use super::sync_type::RangeSyncType; @@ -58,24 +59,28 @@ use types::{Epoch, EthSpec, SignedBeaconBlock, Slot}; /// The primary object dealing with long range/batch syncing. This contains all the active and /// non-active chains that need to be processed before the syncing is considered complete. This /// holds the current state of the long range sync. -pub struct RangeSync { +pub struct RangeSync>> { /// The beacon chain for processing. - beacon_chain: Arc>, + beacon_chain: C, /// Last known sync info of our useful connected peers. We use this information to create Head /// chains after all finalized chains have ended. awaiting_head_peers: HashMap, /// A collection of chains that need to be downloaded. This stores any head or finalized chains /// that need to be downloaded. - chains: ChainCollection, + chains: ChainCollection, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. beacon_processor_send: mpsc::Sender>, /// The syncing logger. log: slog::Logger, } -impl RangeSync { +impl RangeSync +where + C: BlockStorage + Clone + ToStatusMessage, + T: BeaconChainTypes, +{ pub fn new( - beacon_chain: Arc>, + beacon_chain: C, beacon_processor_send: mpsc::Sender>, log: slog::Logger, ) -> Self { @@ -356,3 +361,257 @@ impl RangeSync { ); } } + +#[cfg(test)] +mod tests { + use crate::NetworkMessage; + + use super::*; + use beacon_chain::builder::Witness; + use beacon_chain::eth1_chain::CachingEth1Backend; + use beacon_chain::parking_lot::RwLock; + use lighthouse_network::rpc::BlocksByRangeRequest; + use lighthouse_network::{libp2p, Request}; + use lighthouse_network::{rpc::StatusMessage, NetworkGlobals}; + use slog::{o, Drain}; + + use slot_clock::SystemTimeSlotClock; + use std::sync::atomic::AtomicBool; + use std::sync::Arc; + use store::MemoryStore; + use types::{Hash256, MinimalEthSpec as E}; + + #[derive(Debug)] + struct FakeStorage { + is_block_known: AtomicBool, + status: RwLock, + } + + impl Default for FakeStorage { + fn default() -> Self { + FakeStorage { + is_block_known: AtomicBool::new(false), + status: RwLock::new(StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::zero(), + finalized_epoch: 0usize.into(), + head_root: Hash256::zero(), + head_slot: 0usize.into(), + }), + } + } + } + + impl BlockStorage for Arc { + fn is_block_known(&self, _block_root: &store::Hash256) -> bool { + self.is_block_known + .load(std::sync::atomic::Ordering::Relaxed) + } + } + + impl ToStatusMessage for Arc { + fn status_message(&self) -> Result { + Ok(self.status.read().clone()) + } + } + + type TestBeaconChainType = + Witness, E, MemoryStore, MemoryStore>; + + fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + + if enabled { + slog::Logger::root(drain.filter_level(level).fuse(), o!()) + } else { + slog::Logger::root(drain.filter(|_| false).fuse(), o!()) + } + } + + #[allow(unused)] + struct TestRig { + log: slog::Logger, + /// To check what does sync send to the beacon processor. + beacon_processor_rx: mpsc::Receiver>, + /// To set up different scenarios where sync is told about known/unkown blocks. + chain: Arc, + /// Needed by range to handle communication with the network. + cx: SyncNetworkContext, + /// To check what the network receives from Range. + network_rx: mpsc::UnboundedReceiver>, + /// To modify what the network declares about various global variables, in particular about + /// the sync state of a peer. + globals: Arc>, + } + + impl RangeSync> { + fn assert_state(&self, expected_state: RangeSyncType) { + assert_eq!( + self.state() + .expect("State is ok") + .expect("Range is syncing") + .0, + expected_state + ) + } + } + + impl TestRig { + fn local_info(&self) -> SyncInfo { + let StatusMessage { + fork_digest: _, + finalized_root, + finalized_epoch, + head_root, + head_slot, + } = self.chain.status.read().clone(); + SyncInfo { + head_slot, + head_root, + finalized_epoch, + finalized_root, + } + } + + /// Reads an BlocksByRange request to a given peer from the network receiver channel. + fn grab_request( + &mut self, + expected_peer: &PeerId, + ) -> (lighthouse_network::rpc::RequestId, BlocksByRangeRequest) { + if let Some(NetworkMessage::SendRequest { + peer_id, + request: Request::BlocksByRange(request), + request_id, + }) = self.network_rx.blocking_recv() + { + assert_eq!(&peer_id, expected_peer); + (request_id, request) + } else { + panic!("Should have sent a batch request to the peer") + } + } + + /// Produce a head peer + fn head_peer( + &self, + ) -> ( + PeerId, + SyncInfo, /* Local info */ + SyncInfo, /* Remote info */ + ) { + let local_info = self.local_info(); + + // Get a peer with an advanced head + let head_root = Hash256::random(); + let head_slot = local_info.head_slot + 1; + let remote_info = SyncInfo { + head_root, + head_slot, + ..local_info + }; + let peer_id = PeerId::random(); + (peer_id, local_info, remote_info) + } + + fn finalized_peer( + &self, + ) -> ( + PeerId, + SyncInfo, /* Local info */ + SyncInfo, /* Remote info */ + ) { + let local_info = self.local_info(); + + let finalized_root = Hash256::random(); + let finalized_epoch = local_info.finalized_epoch + 1; + let head_slot = finalized_epoch.start_slot(E::slots_per_epoch()); + let head_root = Hash256::random(); + let remote_info = SyncInfo { + finalized_epoch, + finalized_root, + head_slot, + head_root, + }; + + let peer_id = PeerId::random(); + (peer_id, local_info, remote_info) + } + } + + fn range(log_enabled: bool) -> (TestRig, RangeSync>) { + let chain = Arc::new(FakeStorage::default()); + let log = build_log(slog::Level::Trace, log_enabled); + let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(10); + let range_sync = RangeSync::>::new( + chain.clone(), + beacon_processor_tx, + log.new(o!("component" => "range")), + ); + let (network_tx, network_rx) = mpsc::unbounded_channel(); + let globals = { + use lighthouse_network::discovery::enr_ext::CombinedKeyExt; + use lighthouse_network::discv5::enr::CombinedKey; + use lighthouse_network::discv5::enr::EnrBuilder; + use lighthouse_network::rpc::methods::{MetaData, MetaDataV2}; + + let keypair = libp2p::identity::Keypair::generate_secp256k1(); + let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap(); + let enr = EnrBuilder::new("v4").build(&enr_key).unwrap(); + let globals = NetworkGlobals::new( + enr, + 9000, + 9000, + MetaData::V2(MetaDataV2 { + seq_number: 0, + attnets: Default::default(), + syncnets: Default::default(), + }), + vec![], + &log, + ); + Arc::new(globals) + }; + let cx = SyncNetworkContext::new( + network_tx, + globals.clone(), + log.new(o!("component" => "network_context")), + ); + let test_rig = TestRig { + log, + beacon_processor_rx, + chain, + cx, + network_rx, + globals, + }; + (test_rig, range_sync) + } + + #[test] + fn head_chain_removed_while_finalized_syncing() { + // NOTE: this is a regression test. + let (mut rig, mut range) = range(true); + + // Get a peer with an advanced head + let (head_peer, local_info, remote_info) = rig.head_peer(); + range.add_peer(&mut rig.cx, local_info, head_peer, remote_info); + range.assert_state(RangeSyncType::Head); + + // Sync should have requested a batch, grab the request. + let _request = rig.grab_request(&head_peer); + + // Now get a peer with an advanced finalized epoch. + let (finalized_peer, local_info, remote_info) = rig.finalized_peer(); + range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info); + range.assert_state(RangeSyncType::Finalized); + + // Sync should have requested a batch, grab the request + let _second_request = rig.grab_request(&finalized_peer); + + // Fail the head chain by disconnecting the peer. + range.remove_peer(&mut rig.cx, &head_peer); + range.assert_state(RangeSyncType::Finalized); + } +} diff --git a/beacon_node/network/src/sync/range_sync/sync_type.rs b/beacon_node/network/src/sync/range_sync/sync_type.rs index c3a7c9345..d6ffd4a5d 100644 --- a/beacon_node/network/src/sync/range_sync/sync_type.rs +++ b/beacon_node/network/src/sync/range_sync/sync_type.rs @@ -1,12 +1,12 @@ //! Contains logic about identifying which Sync to perform given PeerSyncInfo of ourselves and //! of a remote. -use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::SyncInfo; -use std::sync::Arc; + +use super::block_storage::BlockStorage; /// The type of Range sync that should be done relative to our current state. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RangeSyncType { /// A finalized chain sync should be started with this peer. Finalized, @@ -17,8 +17,8 @@ pub enum RangeSyncType { impl RangeSyncType { /// Determines the type of sync given our local `PeerSyncInfo` and the remote's /// `PeerSyncInfo`. - pub fn new( - chain: &Arc>, + pub fn new( + chain: &C, local_info: &SyncInfo, remote_info: &SyncInfo, ) -> RangeSyncType { @@ -29,10 +29,7 @@ impl RangeSyncType { // not seen the finalized hash before. if remote_info.finalized_epoch > local_info.finalized_epoch - && !chain - .fork_choice - .read() - .contains_block(&remote_info.finalized_root) + && !chain.is_block_known(&remote_info.finalized_root) { RangeSyncType::Finalized } else {