keep failed finalized chains to avoid retries (#3142)
## Issue Addressed In very rare occasions we've seen most if not all our peers in a chain with which we don't agree. Purging these peers can take a very long time: number of retries of the chain. Meanwhile sync is caught in a loop trying the chain again and again. This makes it so that we fast track purging peers via registering the failed chain to prevent retrying for some time (30 seconds). Longer times could be dangerous since a chain can fail if a batch fails to download for example. In this case, I think it's still acceptable to fast track purging peers since they are nor providing the required info anyway Co-authored-by: Divma <26765164+divagant-martian@users.noreply.github.com>
This commit is contained in:
parent
aa72088f8f
commit
7366266bd1
@ -4,7 +4,7 @@ use std::time::Duration;
|
|||||||
use beacon_chain::{BeaconChainTypes, BlockError};
|
use beacon_chain::{BeaconChainTypes, BlockError};
|
||||||
use fnv::FnvHashMap;
|
use fnv::FnvHashMap;
|
||||||
use lighthouse_network::{PeerAction, PeerId};
|
use lighthouse_network::{PeerAction, PeerId};
|
||||||
use lru_cache::LRUCache;
|
use lru_cache::LRUTimeCache;
|
||||||
use slog::{crit, debug, error, trace, warn, Logger};
|
use slog::{crit, debug, error, trace, warn, Logger};
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use store::{Hash256, SignedBeaconBlock};
|
use store::{Hash256, SignedBeaconBlock};
|
||||||
@ -29,7 +29,7 @@ mod single_block_lookup;
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
const FAILED_CHAINS_CACHE_SIZE: usize = 500;
|
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
|
||||||
const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
|
const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
|
||||||
|
|
||||||
pub(crate) struct BlockLookups<T: BeaconChainTypes> {
|
pub(crate) struct BlockLookups<T: BeaconChainTypes> {
|
||||||
@ -37,7 +37,7 @@ pub(crate) struct BlockLookups<T: BeaconChainTypes> {
|
|||||||
parent_queue: SmallVec<[ParentLookup<T::EthSpec>; 3]>,
|
parent_queue: SmallVec<[ParentLookup<T::EthSpec>; 3]>,
|
||||||
|
|
||||||
/// A cache of failed chain lookups to prevent duplicate searches.
|
/// A cache of failed chain lookups to prevent duplicate searches.
|
||||||
failed_chains: LRUCache<Hash256>,
|
failed_chains: LRUTimeCache<Hash256>,
|
||||||
|
|
||||||
/// A collection of block hashes being searched for and a flag indicating if a result has been
|
/// A collection of block hashes being searched for and a flag indicating if a result has been
|
||||||
/// received or not.
|
/// received or not.
|
||||||
@ -56,7 +56,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
pub fn new(beacon_processor_send: mpsc::Sender<WorkEvent<T>>, log: Logger) -> Self {
|
pub fn new(beacon_processor_send: mpsc::Sender<WorkEvent<T>>, log: Logger) -> Self {
|
||||||
Self {
|
Self {
|
||||||
parent_queue: Default::default(),
|
parent_queue: Default::default(),
|
||||||
failed_chains: LRUCache::new(FAILED_CHAINS_CACHE_SIZE),
|
failed_chains: LRUTimeCache::new(Duration::from_secs(
|
||||||
|
FAILED_CHAINS_CACHE_EXPIRY_SECONDS,
|
||||||
|
)),
|
||||||
single_block_lookups: Default::default(),
|
single_block_lookups: Default::default(),
|
||||||
beacon_processor_send,
|
beacon_processor_send,
|
||||||
log,
|
log,
|
||||||
@ -218,7 +220,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
match parent_lookup.verify_block(block, &self.failed_chains) {
|
match parent_lookup.verify_block(block, &mut self.failed_chains) {
|
||||||
Ok(Some(block)) => {
|
Ok(Some(block)) => {
|
||||||
// Block is correct, send to the beacon processor.
|
// Block is correct, send to the beacon processor.
|
||||||
let chain_hash = parent_lookup.chain_hash();
|
let chain_hash = parent_lookup.chain_hash();
|
||||||
|
@ -117,7 +117,7 @@ impl<T: EthSpec> ParentLookup<T> {
|
|||||||
pub fn verify_block(
|
pub fn verify_block(
|
||||||
&mut self,
|
&mut self,
|
||||||
block: Option<Box<SignedBeaconBlock<T>>>,
|
block: Option<Box<SignedBeaconBlock<T>>>,
|
||||||
failed_chains: &lru_cache::LRUCache<Hash256>,
|
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
|
||||||
) -> Result<Option<Box<SignedBeaconBlock<T>>>, VerifyError> {
|
) -> Result<Option<Box<SignedBeaconBlock<T>>>, VerifyError> {
|
||||||
let block = self.current_parent_request.verify_block(block)?;
|
let block = self.current_parent_request.verify_block(block)?;
|
||||||
|
|
||||||
|
@ -49,13 +49,18 @@ use crate::sync::manager::Id;
|
|||||||
use crate::sync::network_context::SyncNetworkContext;
|
use crate::sync::network_context::SyncNetworkContext;
|
||||||
use crate::sync::BatchProcessResult;
|
use crate::sync::BatchProcessResult;
|
||||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||||
|
use lighthouse_network::rpc::GoodbyeReason;
|
||||||
use lighthouse_network::PeerId;
|
use lighthouse_network::PeerId;
|
||||||
use lighthouse_network::SyncInfo;
|
use lighthouse_network::SyncInfo;
|
||||||
use slog::{crit, debug, error, trace};
|
use lru_cache::LRUTimeCache;
|
||||||
|
use slog::{crit, debug, error, trace, warn};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use types::{Epoch, EthSpec, SignedBeaconBlock, Slot};
|
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
|
||||||
|
|
||||||
|
/// For how long we store failed finalized chains to prevent retries.
|
||||||
|
const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30;
|
||||||
|
|
||||||
/// The primary object dealing with long range/batch syncing. This contains all the active and
|
/// The primary object dealing with long range/batch syncing. This contains all the active and
|
||||||
/// non-active chains that need to be processed before the syncing is considered complete. This
|
/// non-active chains that need to be processed before the syncing is considered complete. This
|
||||||
@ -69,6 +74,8 @@ pub struct RangeSync<T: BeaconChainTypes, C = BeaconChain<T>> {
|
|||||||
/// A collection of chains that need to be downloaded. This stores any head or finalized chains
|
/// A collection of chains that need to be downloaded. This stores any head or finalized chains
|
||||||
/// that need to be downloaded.
|
/// that need to be downloaded.
|
||||||
chains: ChainCollection<T, C>,
|
chains: ChainCollection<T, C>,
|
||||||
|
/// Chains that have failed and are stored to prevent being retried.
|
||||||
|
failed_chains: LRUTimeCache<Hash256>,
|
||||||
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
|
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
|
||||||
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
|
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
|
||||||
/// The syncing logger.
|
/// The syncing logger.
|
||||||
@ -88,6 +95,9 @@ where
|
|||||||
RangeSync {
|
RangeSync {
|
||||||
beacon_chain: beacon_chain.clone(),
|
beacon_chain: beacon_chain.clone(),
|
||||||
chains: ChainCollection::new(beacon_chain, log.clone()),
|
chains: ChainCollection::new(beacon_chain, log.clone()),
|
||||||
|
failed_chains: LRUTimeCache::new(std::time::Duration::from_secs(
|
||||||
|
FAILED_CHAINS_EXPIRY_SECONDS,
|
||||||
|
)),
|
||||||
awaiting_head_peers: HashMap::new(),
|
awaiting_head_peers: HashMap::new(),
|
||||||
beacon_processor_send,
|
beacon_processor_send,
|
||||||
log,
|
log,
|
||||||
@ -128,6 +138,14 @@ where
|
|||||||
// determine which kind of sync to perform and set up the chains
|
// determine which kind of sync to perform and set up the chains
|
||||||
match RangeSyncType::new(self.beacon_chain.as_ref(), &local_info, &remote_info) {
|
match RangeSyncType::new(self.beacon_chain.as_ref(), &local_info, &remote_info) {
|
||||||
RangeSyncType::Finalized => {
|
RangeSyncType::Finalized => {
|
||||||
|
// Make sure we have not recently tried this chain
|
||||||
|
if self.failed_chains.contains(&remote_info.finalized_root) {
|
||||||
|
debug!(self.log, "Disconnecting peer that belongs to previously failed chain";
|
||||||
|
"failed_root" => %remote_info.finalized_root, "peer_id" => %peer_id);
|
||||||
|
network.goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Finalized chain search
|
// Finalized chain search
|
||||||
debug!(self.log, "Finalization sync peer joined"; "peer_id" => %peer_id);
|
debug!(self.log, "Finalization sync peer joined"; "peer_id" => %peer_id);
|
||||||
self.awaiting_head_peers.remove(&peer_id);
|
self.awaiting_head_peers.remove(&peer_id);
|
||||||
@ -338,6 +356,13 @@ where
|
|||||||
debug!(self.log, "Chain removed"; "sync_type" => ?sync_type, &chain, "reason" => ?remove_reason, "op" => op);
|
debug!(self.log, "Chain removed"; "sync_type" => ?sync_type, &chain, "reason" => ?remove_reason, "op" => op);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let RemoveChain::ChainFailed(_) = remove_reason {
|
||||||
|
if RangeSyncType::Finalized == sync_type {
|
||||||
|
warn!(self.log, "Chain failed! Syncing to its head won't be retried for at least the next {} seconds", FAILED_CHAINS_EXPIRY_SECONDS; &chain);
|
||||||
|
self.failed_chains.insert(chain.target_head_root);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
network.status_peers(self.beacon_chain.as_ref(), chain.peers());
|
network.status_peers(self.beacon_chain.as_ref(), chain.peers());
|
||||||
|
|
||||||
let local = match self.beacon_chain.status_message() {
|
let local = match self.beacon_chain.status_message() {
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
//! A library to provide fast and efficient LRU Cache's without updating.
|
//! A library to provide fast and efficient LRU Cache's.
|
||||||
|
|
||||||
mod space;
|
|
||||||
mod time;
|
mod time;
|
||||||
|
|
||||||
pub use space::LRUCache;
|
|
||||||
pub use time::LRUTimeCache;
|
pub use time::LRUTimeCache;
|
||||||
|
@ -1,93 +0,0 @@
|
|||||||
///! This implements a time-based LRU cache for fast checking of duplicates
|
|
||||||
use fnv::FnvHashSet;
|
|
||||||
use std::collections::VecDeque;
|
|
||||||
|
|
||||||
/// Cache that stores keys until the size is used up. Does not update elements for efficiency.
|
|
||||||
pub struct LRUCache<Key>
|
|
||||||
where
|
|
||||||
Key: Eq + std::hash::Hash + Clone,
|
|
||||||
{
|
|
||||||
/// The duplicate cache.
|
|
||||||
map: FnvHashSet<Key>,
|
|
||||||
/// An ordered list of keys by order.
|
|
||||||
list: VecDeque<Key>,
|
|
||||||
// The max size of the cache,
|
|
||||||
size: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Key> LRUCache<Key>
|
|
||||||
where
|
|
||||||
Key: Eq + std::hash::Hash + Clone,
|
|
||||||
{
|
|
||||||
pub fn new(size: usize) -> Self {
|
|
||||||
LRUCache {
|
|
||||||
map: FnvHashSet::default(),
|
|
||||||
list: VecDeque::new(),
|
|
||||||
size,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Determines if the key is in the cache.
|
|
||||||
pub fn contains(&self, key: &Key) -> bool {
|
|
||||||
self.map.contains(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Inserts new elements and removes any expired elements.
|
|
||||||
//
|
|
||||||
// If the key was not present this returns `true`. If the value was already present this
|
|
||||||
// returns `false`.
|
|
||||||
pub fn insert(&mut self, key: Key) -> bool {
|
|
||||||
// check the cache before removing elements
|
|
||||||
let result = self.map.insert(key.clone());
|
|
||||||
|
|
||||||
// add the new key to the list, if it doesn't already exist.
|
|
||||||
if result {
|
|
||||||
self.list.push_back(key);
|
|
||||||
}
|
|
||||||
// remove any overflow keys
|
|
||||||
self.update();
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Removes any expired elements from the cache.
|
|
||||||
fn update(&mut self) {
|
|
||||||
// remove any expired results
|
|
||||||
for _ in 0..self.map.len().saturating_sub(self.size) {
|
|
||||||
if let Some(key) = self.list.pop_front() {
|
|
||||||
self.map.remove(&key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn cache_added_entries_exist() {
|
|
||||||
let mut cache = LRUCache::new(5);
|
|
||||||
|
|
||||||
cache.insert("t");
|
|
||||||
cache.insert("e");
|
|
||||||
|
|
||||||
// Should report that 't' and 't' already exists
|
|
||||||
assert!(!cache.insert("t"));
|
|
||||||
assert!(!cache.insert("e"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn cache_entries_get_removed() {
|
|
||||||
let mut cache = LRUCache::new(2);
|
|
||||||
|
|
||||||
cache.insert("t");
|
|
||||||
assert!(!cache.insert("t"));
|
|
||||||
cache.insert("e");
|
|
||||||
assert!(!cache.insert("e"));
|
|
||||||
// add another element to clear the first key
|
|
||||||
cache.insert("s");
|
|
||||||
assert!(!cache.insert("s"));
|
|
||||||
// should be removed from the cache
|
|
||||||
assert!(cache.insert("t"));
|
|
||||||
}
|
|
||||||
}
|
|
@ -31,53 +31,45 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Inserts new elements and removes any expired elements.
|
// Inserts a new key. It first purges expired elements to do so.
|
||||||
//
|
//
|
||||||
// If the key was not present this returns `true`. If the value was already present this
|
// If the key was not present this returns `true`. If the value was already present this
|
||||||
// returns `false`.
|
// returns `false` and updates the insertion time of the key.
|
||||||
pub fn insert_update(&mut self, key: Key) -> bool {
|
|
||||||
// check the cache before removing elements
|
|
||||||
let result = self.map.insert(key.clone());
|
|
||||||
|
|
||||||
let now = Instant::now();
|
|
||||||
|
|
||||||
// remove any expired results
|
|
||||||
while let Some(element) = self.list.pop_front() {
|
|
||||||
if element.inserted + self.ttl > now {
|
|
||||||
self.list.push_front(element);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
self.map.remove(&element.key);
|
|
||||||
}
|
|
||||||
|
|
||||||
// add the new key to the list, if it doesn't already exist.
|
|
||||||
if result {
|
|
||||||
self.list.push_back(Element { key, inserted: now });
|
|
||||||
}
|
|
||||||
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
|
||||||
// Inserts new element does not expire old elements.
|
|
||||||
//
|
|
||||||
// If the key was not present this returns `true`. If the value was already present this
|
|
||||||
// returns `false`.
|
|
||||||
pub fn insert(&mut self, key: Key) -> bool {
|
pub fn insert(&mut self, key: Key) -> bool {
|
||||||
|
self.update();
|
||||||
// check the cache before removing elements
|
// check the cache before removing elements
|
||||||
let result = self.map.insert(key.clone());
|
let is_new = self.map.insert(key.clone());
|
||||||
|
|
||||||
// add the new key to the list, if it doesn't already exist.
|
// add the new key to the list, if it doesn't already exist.
|
||||||
if result {
|
if is_new {
|
||||||
self.list.push_back(Element {
|
self.list.push_back(Element {
|
||||||
key,
|
key,
|
||||||
inserted: Instant::now(),
|
inserted: Instant::now(),
|
||||||
});
|
});
|
||||||
|
} else {
|
||||||
|
let position = self
|
||||||
|
.list
|
||||||
|
.iter()
|
||||||
|
.position(|e| e.key == key)
|
||||||
|
.expect("Key is not new");
|
||||||
|
let mut element = self
|
||||||
|
.list
|
||||||
|
.remove(position)
|
||||||
|
.expect("Position is not occupied");
|
||||||
|
element.inserted = Instant::now();
|
||||||
|
self.list.push_back(element);
|
||||||
}
|
}
|
||||||
result
|
#[cfg(test)]
|
||||||
|
self.check_invariant();
|
||||||
|
is_new
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes any expired elements from the cache.
|
/// Removes any expired elements from the cache.
|
||||||
pub fn update(&mut self) {
|
pub fn update(&mut self) {
|
||||||
|
if self.list.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
// remove any expired results
|
// remove any expired results
|
||||||
while let Some(element) = self.list.pop_front() {
|
while let Some(element) = self.list.pop_front() {
|
||||||
@ -87,6 +79,46 @@ where
|
|||||||
}
|
}
|
||||||
self.map.remove(&element.key);
|
self.map.remove(&element.key);
|
||||||
}
|
}
|
||||||
|
#[cfg(test)]
|
||||||
|
self.check_invariant()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns if the key is present after removing expired elements.
|
||||||
|
pub fn contains(&mut self, key: &Key) -> bool {
|
||||||
|
self.update();
|
||||||
|
self.map.contains(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
#[track_caller]
|
||||||
|
fn check_invariant(&self) {
|
||||||
|
// The list should be sorted. First element should have the oldest insertion
|
||||||
|
let mut prev_insertion_time = None;
|
||||||
|
for e in &self.list {
|
||||||
|
match prev_insertion_time {
|
||||||
|
Some(prev) => {
|
||||||
|
if prev <= e.inserted {
|
||||||
|
prev_insertion_time = Some(e.inserted);
|
||||||
|
} else {
|
||||||
|
panic!("List is not sorted by insertion time")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => prev_insertion_time = Some(e.inserted),
|
||||||
|
}
|
||||||
|
// The key should be in the map
|
||||||
|
assert!(self.map.contains(&e.key), "List and map should be in sync");
|
||||||
|
}
|
||||||
|
|
||||||
|
for k in &self.map {
|
||||||
|
let _ = self
|
||||||
|
.list
|
||||||
|
.iter()
|
||||||
|
.position(|e| &e.key == k)
|
||||||
|
.expect("Map and list should be in sync");
|
||||||
|
}
|
||||||
|
|
||||||
|
// One last check to make sure there are no duplicates in the list
|
||||||
|
assert_eq!(self.list.len(), self.map.len());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,20 +139,22 @@ mod test {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn cache_entries_expire() {
|
fn test_reinsertion_updates_timeout() {
|
||||||
let mut cache = LRUTimeCache::new(Duration::from_millis(100));
|
let mut cache = LRUTimeCache::new(Duration::from_millis(100));
|
||||||
|
|
||||||
cache.insert_update("t");
|
cache.insert("a");
|
||||||
assert!(!cache.insert_update("t"));
|
cache.insert("b");
|
||||||
cache.insert_update("e");
|
|
||||||
assert!(!cache.insert_update("t"));
|
|
||||||
assert!(!cache.insert_update("e"));
|
|
||||||
// sleep until cache expiry
|
|
||||||
std::thread::sleep(Duration::from_millis(101));
|
|
||||||
// add another element to clear previous cache
|
|
||||||
cache.insert_update("s");
|
|
||||||
|
|
||||||
// should be removed from the cache
|
std::thread::sleep(Duration::from_millis(20));
|
||||||
assert!(cache.insert_update("t"));
|
cache.insert("a");
|
||||||
|
// a is newer now
|
||||||
|
|
||||||
|
std::thread::sleep(Duration::from_millis(85));
|
||||||
|
assert!(cache.contains(&"a"),);
|
||||||
|
// b was inserted first but was not as recent it should have been removed
|
||||||
|
assert!(!cache.contains(&"b"));
|
||||||
|
|
||||||
|
std::thread::sleep(Duration::from_millis(16));
|
||||||
|
assert!(!cache.contains(&"a"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user