diff --git a/Cargo.lock b/Cargo.lock index f8dba7739..37ca2d4e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2993,6 +2993,13 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "lru_cache" +version = "0.1.0" +dependencies = [ + "fnv", +] + [[package]] name = "lru_time_cache" version = "0.10.0" @@ -3271,6 +3278,7 @@ dependencies = [ "itertools 0.9.0", "lazy_static", "lighthouse_metrics", + "lru_cache", "matches", "num_cpus", "parking_lot 0.11.0", diff --git a/Cargo.toml b/Cargo.toml index 35fd77d44..59bf507fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ members = [ "common/lighthouse_metrics", "common/lighthouse_version", "common/logging", + "common/lru_cache", "common/remote_beacon_node", "common/rest_types", "common/slot_clock", diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index da1739672..2ef369e3b 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -40,3 +40,4 @@ lighthouse_metrics = { path = "../../common/lighthouse_metrics" } environment = { path = "../../lighthouse/environment" } itertools = "0.9.0" num_cpus = "1.13.0" +lru_cache = { path = "../../common/lru_cache" } diff --git a/beacon_node/network/src/beacon_processor/chain_segment.rs b/beacon_node/network/src/beacon_processor/chain_segment.rs index 8473e9927..f402bbb31 100644 --- a/beacon_node/network/src/beacon_processor/chain_segment.rs +++ b/beacon_node/network/src/beacon_processor/chain_segment.rs @@ -7,15 +7,15 @@ use eth2_libp2p::PeerId; use slog::{debug, error, trace, warn}; use std::sync::Arc; use tokio::sync::mpsc; -use types::{Epoch, EthSpec, SignedBeaconBlock}; +use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock}; /// Id associated to a block processing request, either a batch or a single block. #[derive(Clone, Debug, PartialEq)] pub enum ProcessId { /// Processing Id of a range syncing batch. RangeBatchId(ChainId, Epoch), - /// Processing Id of the parent lookup of a block - ParentLookup(PeerId), + /// Processing Id of the parent lookup of a block. + ParentLookup(PeerId, Hash256), } pub fn handle_chain_segment( @@ -71,7 +71,7 @@ pub fn handle_chain_segment( }); } // this a parent lookup request from the sync manager - ProcessId::ParentLookup(peer_id) => { + ProcessId::ParentLookup(peer_id, chain_head) => { debug!( log, "Processing parent lookup"; "last_peer_id" => format!("{}", peer_id), @@ -83,7 +83,7 @@ pub fn handle_chain_segment( (_, Err(e)) => { warn!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e); sync_send - .send(SyncMessage::ParentLookupFailed(peer_id)) + .send(SyncMessage::ParentLookupFailed{peer_id, chain_head}) .unwrap_or_else(|_| { // on failure, inform to downvote the peer debug!( diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index af55f489b..3aa5577d7 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -44,6 +44,7 @@ use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, Goodbye use eth2_libp2p::types::NetworkGlobals; use eth2_libp2p::{PeerAction, PeerId}; use fnv::FnvHashMap; +use lru_cache::LRUCache; use slog::{crit, debug, error, info, trace, warn, Logger}; use smallvec::SmallVec; use ssz_types::VariableList; @@ -105,8 +106,13 @@ pub enum SyncMessage { result: BatchProcessResult, }, - /// A parent lookup has failed for a block given by this `peer_id`. - ParentLookupFailed(PeerId), + /// A parent lookup has failed. + ParentLookupFailed { + /// The head of the chain of blocks that failed to process. + chain_head: Hash256, + /// The peer that instigated the chain lookup. + peer_id: PeerId, + }, } /// The result of processing a multiple blocks (a chain segment). @@ -161,6 +167,9 @@ pub struct SyncManager { /// A collection of parent block lookups. parent_queue: SmallVec<[ParentRequests; 3]>, + /// A cache of failed chain lookups to prevent duplicate searches. + failed_chains: LRUCache, + /// A collection of block hashes being searched for and a flag indicating if a result has been /// received or not. /// @@ -222,6 +231,7 @@ pub fn spawn( network_globals, input_channel: sync_recv, parent_queue: SmallVec::new(), + failed_chains: LRUCache::new(500), single_block_lookups: FnvHashMap::default(), log: log.clone(), beacon_processor_send, @@ -351,6 +361,22 @@ impl SyncManager { return; } }; + + // check if the parent of this block isn't in our failed cache. If it is, this + // chain should be dropped and the peer downscored. + if self.failed_chains.contains(&block.message.parent_root) { + debug!(self.log, "Parent chain ignored due to past failure"; "block" => format!("{:?}", block.message.parent_root), "slot" => block.message.slot); + if !parent_request.downloaded_blocks.is_empty() { + // Add the root block to failed chains + self.failed_chains + .insert(parent_request.downloaded_blocks[0].canonical_root()); + } else { + crit!(self.log, "Parent chain has no blocks"); + } + self.network + .report_peer(peer_id, PeerAction::MidToleranceError); + return; + } // add the block to response parent_request.downloaded_blocks.push(block); // queue for processing @@ -510,6 +536,15 @@ impl SyncManager { } } + let block_root = block.canonical_root(); + // If this block or it's parent is part of a known failed chain, ignore it. + if self.failed_chains.contains(&block.message.parent_root) + || self.failed_chains.contains(&block_root) + { + debug!(self.log, "Block is from a past failed chain. Dropping"; "block_root" => format!("{:?}", block_root), "block_slot" => block.message.slot); + return; + } + // Make sure this block is not already being searched for // NOTE: Potentially store a hashset of blocks for O(1) lookups for parent_req in self.parent_queue.iter() { @@ -697,6 +732,8 @@ impl SyncManager { // If the last block in the queue has an unknown parent, we continue the parent // lookup-search. + let chain_block_hash = parent_request.downloaded_blocks[0].canonical_root(); + let newest_block = parent_request .downloaded_blocks .pop() @@ -715,8 +752,10 @@ impl SyncManager { self.request_parent(parent_request); } Ok(_) | Err(BlockError::BlockIsAlreadyKnown { .. }) => { - let process_id = - ProcessId::ParentLookup(parent_request.last_submitted_peer.clone()); + let process_id = ProcessId::ParentLookup( + parent_request.last_submitted_peer.clone(), + chain_block_hash, + ); let blocks = parent_request.downloaded_blocks; match self @@ -742,6 +781,10 @@ impl SyncManager { "outcome" => format!("{:?}", outcome), "last_peer" => parent_request.last_submitted_peer.to_string(), ); + + // Add this chain to cache of failed chains + self.failed_chains.insert(chain_block_hash); + // This currently can be a host of errors. We permit this due to the partial // ambiguity. // TODO: Refine the error types and score the peer appropriately. @@ -764,8 +807,17 @@ impl SyncManager { || parent_request.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE { let error = if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE { + // This is a peer-specific error and the chain could be continued with another + // peer. We don't consider this chain a failure and prevent retries with another + // peer. "too many failed attempts" } else { + if !parent_request.downloaded_blocks.is_empty() { + self.failed_chains + .insert(parent_request.downloaded_blocks[0].canonical_root()); + } else { + crit!(self.log, "Parent lookup has no blocks"); + } "reached maximum lookup-depth" }; @@ -774,6 +826,11 @@ impl SyncManager { "ancestors_found" => parent_request.downloaded_blocks.len(), "reason" => error ); + // Downscore the peer. + self.network.report_peer( + parent_request.last_submitted_peer, + PeerAction::LowToleranceError, + ); return; // drop the request } @@ -854,12 +911,13 @@ impl SyncManager { result, ); } - SyncMessage::ParentLookupFailed(peer_id) => { + SyncMessage::ParentLookupFailed { + chain_head, + peer_id, + } => { // A peer sent an object (block or attestation) that referenced a parent. - // On request for this parent the peer indicated it did not have this - // block. - // This is not fatal. Peer's could prune old blocks so we moderately - // tolerate this behaviour. + // The processing of this chain failed. + self.failed_chains.insert(chain_head); self.network .report_peer(peer_id, PeerAction::MidToleranceError); } diff --git a/common/lru_cache/Cargo.toml b/common/lru_cache/Cargo.toml new file mode 100644 index 000000000..df5d9b162 --- /dev/null +++ b/common/lru_cache/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "lru_cache" +version = "0.1.0" +authors = ["Sigma Prime "] +edition = "2018" + +[dependencies] +fnv = "1.0.7" diff --git a/common/lru_cache/src/lib.rs b/common/lru_cache/src/lib.rs new file mode 100644 index 000000000..51df38bcf --- /dev/null +++ b/common/lru_cache/src/lib.rs @@ -0,0 +1,7 @@ +//! A library to provide fast and efficient LRU Cache's without updating. + +mod space; +mod time; + +pub use space::LRUCache; +pub use time::LRUTimeCache; diff --git a/common/lru_cache/src/space.rs b/common/lru_cache/src/space.rs new file mode 100644 index 000000000..db588632a --- /dev/null +++ b/common/lru_cache/src/space.rs @@ -0,0 +1,93 @@ +///! This implements a time-based LRU cache for fast checking of duplicates +use fnv::FnvHashSet; +use std::collections::VecDeque; + +/// Cache that stores keys until the size is used up. Does not update elements for efficiency. +pub struct LRUCache +where + Key: Eq + std::hash::Hash + Clone, +{ + /// The duplicate cache. + map: FnvHashSet, + /// An ordered list of keys by order. + list: VecDeque, + // The max size of the cache, + size: usize, +} + +impl LRUCache +where + Key: Eq + std::hash::Hash + Clone, +{ + pub fn new(size: usize) -> Self { + LRUCache { + map: FnvHashSet::default(), + list: VecDeque::new(), + size, + } + } + + /// Determines if the key is in the cache. + pub fn contains(&self, key: &Key) -> bool { + self.map.contains(key) + } + + // Inserts new elements and removes any expired elements. + // + // If the key was not present this returns `true`. If the value was already present this + // returns `false`. + pub fn insert(&mut self, key: Key) -> bool { + // check the cache before removing elements + let result = self.map.insert(key.clone()); + + // add the new key to the list, if it doesn't already exist. + if result { + self.list.push_back(key); + } + // remove any overflow keys + self.update(); + result + } + + /// Removes any expired elements from the cache. + fn update(&mut self) { + // remove any expired results + for _ in 0..self.map.len().saturating_sub(self.size) { + if let Some(key) = self.list.pop_front() { + self.map.remove(&key); + } + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn cache_added_entries_exist() { + let mut cache = LRUCache::new(5); + + cache.insert("t"); + cache.insert("e"); + + // Should report that 't' and 't' already exists + assert!(!cache.insert("t")); + assert!(!cache.insert("e")); + } + + #[test] + fn cache_entries_get_removed() { + let mut cache = LRUCache::new(2); + + cache.insert("t"); + assert!(!cache.insert("t")); + cache.insert("e"); + assert!(!cache.insert("e")); + // add another element to clear the first key + cache.insert("s"); + assert!(!cache.insert("s")); + // should be removed from the cache + assert!(cache.insert("t")); + } +} diff --git a/common/lru_cache/src/time.rs b/common/lru_cache/src/time.rs new file mode 100644 index 000000000..30f890a8c --- /dev/null +++ b/common/lru_cache/src/time.rs @@ -0,0 +1,126 @@ +///! This implements a time-based LRU cache for fast checking of duplicates +use fnv::FnvHashSet; +use std::collections::VecDeque; +use std::time::{Duration, Instant}; + +struct Element { + /// The key being inserted. + key: Key, + /// The instant the key was inserted. + inserted: Instant, +} + +pub struct LRUTimeCache { + /// The duplicate cache. + map: FnvHashSet, + /// An ordered list of keys by insert time. + list: VecDeque>, + /// The time elements remain in the cache. + ttl: Duration, +} + +impl LRUTimeCache +where + Key: Eq + std::hash::Hash + Clone, +{ + pub fn new(ttl: Duration) -> Self { + LRUTimeCache { + map: FnvHashSet::default(), + list: VecDeque::new(), + ttl, + } + } + + // Inserts new elements and removes any expired elements. + // + // If the key was not present this returns `true`. If the value was already present this + // returns `false`. + pub fn insert_update(&mut self, key: Key) -> bool { + // check the cache before removing elements + let result = self.map.insert(key.clone()); + + let now = Instant::now(); + + // remove any expired results + while let Some(element) = self.list.pop_front() { + if element.inserted + self.ttl > now { + self.list.push_front(element); + break; + } + self.map.remove(&element.key); + } + + // add the new key to the list, if it doesn't already exist. + if result { + self.list.push_back(Element { key, inserted: now }); + } + + result + } + + // Inserts new element does not expire old elements. + // + // If the key was not present this returns `true`. If the value was already present this + // returns `false`. + pub fn insert(&mut self, key: Key) -> bool { + // check the cache before removing elements + let result = self.map.insert(key.clone()); + + // add the new key to the list, if it doesn't already exist. + if result { + self.list.push_back(Element { + key, + inserted: Instant::now(), + }); + } + result + } + + /// Removes any expired elements from the cache. + pub fn update(&mut self) { + let now = Instant::now(); + // remove any expired results + while let Some(element) = self.list.pop_front() { + if element.inserted + self.ttl > now { + self.list.push_front(element); + break; + } + self.map.remove(&element.key); + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn cache_added_entries_exist() { + let mut cache = LRUTimeCache::new(Duration::from_secs(10)); + + cache.insert("t"); + cache.insert("e"); + + // Should report that 't' and 't' already exists + assert!(!cache.insert("t")); + assert!(!cache.insert("e")); + } + + #[test] + fn cache_entries_expire() { + let mut cache = LRUTimeCache::new(Duration::from_millis(100)); + + cache.insert_update("t"); + assert!(!cache.insert_update("t")); + cache.insert_update("e"); + assert!(!cache.insert_update("t")); + assert!(!cache.insert_update("e")); + // sleep until cache expiry + std::thread::sleep(Duration::from_millis(101)); + // add another element to clear previous cache + cache.insert_update("s"); + + // should be removed from the cache + assert!(cache.insert_update("t")); + } +}