diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 0a6df6df0..ece923ef5 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -4,7 +4,7 @@ use std::time::Duration; use beacon_chain::{BeaconChainTypes, BlockError}; use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; -use lru_cache::LRUCache; +use lru_cache::LRUTimeCache; use slog::{crit, debug, error, trace, warn, Logger}; use smallvec::SmallVec; use store::{Hash256, SignedBeaconBlock}; @@ -29,7 +29,7 @@ mod single_block_lookup; #[cfg(test)] mod tests; -const FAILED_CHAINS_CACHE_SIZE: usize = 500; +const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; pub(crate) struct BlockLookups { @@ -37,7 +37,7 @@ pub(crate) struct BlockLookups { parent_queue: SmallVec<[ParentLookup; 3]>, /// A cache of failed chain lookups to prevent duplicate searches. - failed_chains: LRUCache, + failed_chains: LRUTimeCache, /// A collection of block hashes being searched for and a flag indicating if a result has been /// received or not. @@ -56,7 +56,9 @@ impl BlockLookups { pub fn new(beacon_processor_send: mpsc::Sender>, log: Logger) -> Self { Self { parent_queue: Default::default(), - failed_chains: LRUCache::new(FAILED_CHAINS_CACHE_SIZE), + failed_chains: LRUTimeCache::new(Duration::from_secs( + FAILED_CHAINS_CACHE_EXPIRY_SECONDS, + )), single_block_lookups: Default::default(), beacon_processor_send, log, @@ -218,7 +220,7 @@ impl BlockLookups { return; }; - match parent_lookup.verify_block(block, &self.failed_chains) { + match parent_lookup.verify_block(block, &mut self.failed_chains) { Ok(Some(block)) => { // Block is correct, send to the beacon processor. let chain_hash = parent_lookup.chain_hash(); diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 777c3e930..a9a3c34bc 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -117,7 +117,7 @@ impl ParentLookup { pub fn verify_block( &mut self, block: Option>>, - failed_chains: &lru_cache::LRUCache, + failed_chains: &mut lru_cache::LRUTimeCache, ) -> Result>>, VerifyError> { let block = self.current_parent_request.verify_block(block)?; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 185fc204a..9953df81d 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -49,13 +49,18 @@ use crate::sync::manager::Id; use crate::sync::network_context::SyncNetworkContext; use crate::sync::BatchProcessResult; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use lighthouse_network::rpc::GoodbyeReason; use lighthouse_network::PeerId; use lighthouse_network::SyncInfo; -use slog::{crit, debug, error, trace}; +use lru_cache::LRUTimeCache; +use slog::{crit, debug, error, trace, warn}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc; -use types::{Epoch, EthSpec, SignedBeaconBlock, Slot}; +use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; + +/// For how long we store failed finalized chains to prevent retries. +const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30; /// The primary object dealing with long range/batch syncing. This contains all the active and /// non-active chains that need to be processed before the syncing is considered complete. This @@ -69,6 +74,8 @@ pub struct RangeSync> { /// A collection of chains that need to be downloaded. This stores any head or finalized chains /// that need to be downloaded. chains: ChainCollection, + /// Chains that have failed and are stored to prevent being retried. + failed_chains: LRUTimeCache, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. beacon_processor_send: mpsc::Sender>, /// The syncing logger. @@ -88,6 +95,9 @@ where RangeSync { beacon_chain: beacon_chain.clone(), chains: ChainCollection::new(beacon_chain, log.clone()), + failed_chains: LRUTimeCache::new(std::time::Duration::from_secs( + FAILED_CHAINS_EXPIRY_SECONDS, + )), awaiting_head_peers: HashMap::new(), beacon_processor_send, log, @@ -128,6 +138,14 @@ where // determine which kind of sync to perform and set up the chains match RangeSyncType::new(self.beacon_chain.as_ref(), &local_info, &remote_info) { RangeSyncType::Finalized => { + // Make sure we have not recently tried this chain + if self.failed_chains.contains(&remote_info.finalized_root) { + debug!(self.log, "Disconnecting peer that belongs to previously failed chain"; + "failed_root" => %remote_info.finalized_root, "peer_id" => %peer_id); + network.goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); + return; + } + // Finalized chain search debug!(self.log, "Finalization sync peer joined"; "peer_id" => %peer_id); self.awaiting_head_peers.remove(&peer_id); @@ -338,6 +356,13 @@ where debug!(self.log, "Chain removed"; "sync_type" => ?sync_type, &chain, "reason" => ?remove_reason, "op" => op); } + if let RemoveChain::ChainFailed(_) = remove_reason { + if RangeSyncType::Finalized == sync_type { + warn!(self.log, "Chain failed! Syncing to its head won't be retried for at least the next {} seconds", FAILED_CHAINS_EXPIRY_SECONDS; &chain); + self.failed_chains.insert(chain.target_head_root); + } + } + network.status_peers(self.beacon_chain.as_ref(), chain.peers()); let local = match self.beacon_chain.status_message() { diff --git a/common/lru_cache/src/lib.rs b/common/lru_cache/src/lib.rs index 51df38bcf..6eecb58c1 100644 --- a/common/lru_cache/src/lib.rs +++ b/common/lru_cache/src/lib.rs @@ -1,7 +1,5 @@ -//! A library to provide fast and efficient LRU Cache's without updating. +//! A library to provide fast and efficient LRU Cache's. -mod space; mod time; -pub use space::LRUCache; pub use time::LRUTimeCache; diff --git a/common/lru_cache/src/space.rs b/common/lru_cache/src/space.rs deleted file mode 100644 index db588632a..000000000 --- a/common/lru_cache/src/space.rs +++ /dev/null @@ -1,93 +0,0 @@ -///! This implements a time-based LRU cache for fast checking of duplicates -use fnv::FnvHashSet; -use std::collections::VecDeque; - -/// Cache that stores keys until the size is used up. Does not update elements for efficiency. -pub struct LRUCache -where - Key: Eq + std::hash::Hash + Clone, -{ - /// The duplicate cache. - map: FnvHashSet, - /// An ordered list of keys by order. - list: VecDeque, - // The max size of the cache, - size: usize, -} - -impl LRUCache -where - Key: Eq + std::hash::Hash + Clone, -{ - pub fn new(size: usize) -> Self { - LRUCache { - map: FnvHashSet::default(), - list: VecDeque::new(), - size, - } - } - - /// Determines if the key is in the cache. - pub fn contains(&self, key: &Key) -> bool { - self.map.contains(key) - } - - // Inserts new elements and removes any expired elements. - // - // If the key was not present this returns `true`. If the value was already present this - // returns `false`. - pub fn insert(&mut self, key: Key) -> bool { - // check the cache before removing elements - let result = self.map.insert(key.clone()); - - // add the new key to the list, if it doesn't already exist. - if result { - self.list.push_back(key); - } - // remove any overflow keys - self.update(); - result - } - - /// Removes any expired elements from the cache. - fn update(&mut self) { - // remove any expired results - for _ in 0..self.map.len().saturating_sub(self.size) { - if let Some(key) = self.list.pop_front() { - self.map.remove(&key); - } - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn cache_added_entries_exist() { - let mut cache = LRUCache::new(5); - - cache.insert("t"); - cache.insert("e"); - - // Should report that 't' and 't' already exists - assert!(!cache.insert("t")); - assert!(!cache.insert("e")); - } - - #[test] - fn cache_entries_get_removed() { - let mut cache = LRUCache::new(2); - - cache.insert("t"); - assert!(!cache.insert("t")); - cache.insert("e"); - assert!(!cache.insert("e")); - // add another element to clear the first key - cache.insert("s"); - assert!(!cache.insert("s")); - // should be removed from the cache - assert!(cache.insert("t")); - } -} diff --git a/common/lru_cache/src/time.rs b/common/lru_cache/src/time.rs index 30f890a8c..5c0e4c1ca 100644 --- a/common/lru_cache/src/time.rs +++ b/common/lru_cache/src/time.rs @@ -31,53 +31,45 @@ where } } - // Inserts new elements and removes any expired elements. + // Inserts a new key. It first purges expired elements to do so. // // If the key was not present this returns `true`. If the value was already present this - // returns `false`. - pub fn insert_update(&mut self, key: Key) -> bool { - // check the cache before removing elements - let result = self.map.insert(key.clone()); - - let now = Instant::now(); - - // remove any expired results - while let Some(element) = self.list.pop_front() { - if element.inserted + self.ttl > now { - self.list.push_front(element); - break; - } - self.map.remove(&element.key); - } - - // add the new key to the list, if it doesn't already exist. - if result { - self.list.push_back(Element { key, inserted: now }); - } - - result - } - - // Inserts new element does not expire old elements. - // - // If the key was not present this returns `true`. If the value was already present this - // returns `false`. + // returns `false` and updates the insertion time of the key. pub fn insert(&mut self, key: Key) -> bool { + self.update(); // check the cache before removing elements - let result = self.map.insert(key.clone()); + let is_new = self.map.insert(key.clone()); // add the new key to the list, if it doesn't already exist. - if result { + if is_new { self.list.push_back(Element { key, inserted: Instant::now(), }); + } else { + let position = self + .list + .iter() + .position(|e| e.key == key) + .expect("Key is not new"); + let mut element = self + .list + .remove(position) + .expect("Position is not occupied"); + element.inserted = Instant::now(); + self.list.push_back(element); } - result + #[cfg(test)] + self.check_invariant(); + is_new } /// Removes any expired elements from the cache. pub fn update(&mut self) { + if self.list.is_empty() { + return; + } + let now = Instant::now(); // remove any expired results while let Some(element) = self.list.pop_front() { @@ -87,6 +79,46 @@ where } self.map.remove(&element.key); } + #[cfg(test)] + self.check_invariant() + } + + /// Returns if the key is present after removing expired elements. + pub fn contains(&mut self, key: &Key) -> bool { + self.update(); + self.map.contains(key) + } + + #[cfg(test)] + #[track_caller] + fn check_invariant(&self) { + // The list should be sorted. First element should have the oldest insertion + let mut prev_insertion_time = None; + for e in &self.list { + match prev_insertion_time { + Some(prev) => { + if prev <= e.inserted { + prev_insertion_time = Some(e.inserted); + } else { + panic!("List is not sorted by insertion time") + } + } + None => prev_insertion_time = Some(e.inserted), + } + // The key should be in the map + assert!(self.map.contains(&e.key), "List and map should be in sync"); + } + + for k in &self.map { + let _ = self + .list + .iter() + .position(|e| &e.key == k) + .expect("Map and list should be in sync"); + } + + // One last check to make sure there are no duplicates in the list + assert_eq!(self.list.len(), self.map.len()); } } @@ -107,20 +139,22 @@ mod test { } #[test] - fn cache_entries_expire() { + fn test_reinsertion_updates_timeout() { let mut cache = LRUTimeCache::new(Duration::from_millis(100)); - cache.insert_update("t"); - assert!(!cache.insert_update("t")); - cache.insert_update("e"); - assert!(!cache.insert_update("t")); - assert!(!cache.insert_update("e")); - // sleep until cache expiry - std::thread::sleep(Duration::from_millis(101)); - // add another element to clear previous cache - cache.insert_update("s"); + cache.insert("a"); + cache.insert("b"); - // should be removed from the cache - assert!(cache.insert_update("t")); + std::thread::sleep(Duration::from_millis(20)); + cache.insert("a"); + // a is newer now + + std::thread::sleep(Duration::from_millis(85)); + assert!(cache.contains(&"a"),); + // b was inserted first but was not as recent it should have been removed + assert!(!cache.contains(&"b")); + + std::thread::sleep(Duration::from_millis(16)); + assert!(!cache.contains(&"a")); } }