Blocklookup data inconsistencies (#3677)
## Issue Addressed
Closes #3649
## Proposed Changes
Add a regression test for the data inconsistency, catching the problem in 31e88c5533
[here](https://github.com/sigp/lighthouse/actions/runs/3379894044/jobs/5612044797#step:6:2043).
When a chain is sent for processing, move it to a separate collection and now the test works, yay!
## Additional Info
na
This commit is contained in:
parent
253767ebc1
commit
84c7d8cc70
@ -1,4 +1,5 @@
|
|||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use beacon_chain::{BeaconChainTypes, BlockError};
|
use beacon_chain::{BeaconChainTypes, BlockError};
|
||||||
@ -13,6 +14,7 @@ use store::{Hash256, SignedBeaconBlock};
|
|||||||
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
|
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
|
||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
|
|
||||||
|
use self::parent_lookup::PARENT_FAIL_TOLERANCE;
|
||||||
use self::{
|
use self::{
|
||||||
parent_lookup::{ParentLookup, VerifyError},
|
parent_lookup::{ParentLookup, VerifyError},
|
||||||
single_block_lookup::SingleBlockRequest,
|
single_block_lookup::SingleBlockRequest,
|
||||||
@ -36,8 +38,11 @@ const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
|
|||||||
const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
|
const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
|
||||||
|
|
||||||
pub(crate) struct BlockLookups<T: BeaconChainTypes> {
|
pub(crate) struct BlockLookups<T: BeaconChainTypes> {
|
||||||
/// A collection of parent block lookups.
|
/// Parent chain lookups being downloaded.
|
||||||
parent_queue: SmallVec<[ParentLookup<T>; 3]>,
|
parent_lookups: SmallVec<[ParentLookup<T>; 3]>,
|
||||||
|
|
||||||
|
processing_parent_lookups:
|
||||||
|
HashMap<Hash256, (Vec<Hash256>, SingleBlockRequest<PARENT_FAIL_TOLERANCE>)>,
|
||||||
|
|
||||||
/// A cache of failed chain lookups to prevent duplicate searches.
|
/// A cache of failed chain lookups to prevent duplicate searches.
|
||||||
failed_chains: LRUTimeCache<Hash256>,
|
failed_chains: LRUTimeCache<Hash256>,
|
||||||
@ -55,7 +60,8 @@ pub(crate) struct BlockLookups<T: BeaconChainTypes> {
|
|||||||
impl<T: BeaconChainTypes> BlockLookups<T> {
|
impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||||
pub fn new(log: Logger) -> Self {
|
pub fn new(log: Logger) -> Self {
|
||||||
Self {
|
Self {
|
||||||
parent_queue: Default::default(),
|
parent_lookups: Default::default(),
|
||||||
|
processing_parent_lookups: Default::default(),
|
||||||
failed_chains: LRUTimeCache::new(Duration::from_secs(
|
failed_chains: LRUTimeCache::new(Duration::from_secs(
|
||||||
FAILED_CHAINS_CACHE_EXPIRY_SECONDS,
|
FAILED_CHAINS_CACHE_EXPIRY_SECONDS,
|
||||||
)),
|
)),
|
||||||
@ -78,6 +84,23 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if self.parent_lookups.iter_mut().any(|parent_req| {
|
||||||
|
parent_req.add_peer(&hash, &peer_id) || parent_req.contains_block(&hash)
|
||||||
|
}) {
|
||||||
|
// If the block was already downloaded, or is being downloaded in this moment, do not
|
||||||
|
// request it.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if self
|
||||||
|
.processing_parent_lookups
|
||||||
|
.values()
|
||||||
|
.any(|(hashes, _last_parent_request)| hashes.contains(&hash))
|
||||||
|
{
|
||||||
|
// we are already processing this block, ignore it.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
self.log,
|
self.log,
|
||||||
"Searching for block";
|
"Searching for block";
|
||||||
@ -118,8 +141,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
|
|
||||||
// Make sure this block is not already downloaded, and that neither it or its parent is
|
// Make sure this block is not already downloaded, and that neither it or its parent is
|
||||||
// being searched for.
|
// being searched for.
|
||||||
if self.parent_queue.iter_mut().any(|parent_req| {
|
if self.parent_lookups.iter_mut().any(|parent_req| {
|
||||||
parent_req.contains_block(&block)
|
parent_req.contains_block(&block_root)
|
||||||
|| parent_req.add_peer(&block_root, &peer_id)
|
|| parent_req.add_peer(&block_root, &peer_id)
|
||||||
|| parent_req.add_peer(&parent_root, &peer_id)
|
|| parent_req.add_peer(&parent_root, &peer_id)
|
||||||
}) {
|
}) {
|
||||||
@ -127,6 +150,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if self
|
||||||
|
.processing_parent_lookups
|
||||||
|
.values()
|
||||||
|
.any(|(hashes, _peers)| hashes.contains(&block_root) || hashes.contains(&parent_root))
|
||||||
|
{
|
||||||
|
// we are already processing this block, ignore it.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let parent_lookup = ParentLookup::new(block_root, block, peer_id);
|
let parent_lookup = ParentLookup::new(block_root, block, peer_id);
|
||||||
self.request_parent(parent_lookup, cx);
|
self.request_parent(parent_lookup, cx);
|
||||||
}
|
}
|
||||||
@ -207,11 +239,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
cx: &mut SyncNetworkContext<T>,
|
cx: &mut SyncNetworkContext<T>,
|
||||||
) {
|
) {
|
||||||
let mut parent_lookup = if let Some(pos) = self
|
let mut parent_lookup = if let Some(pos) = self
|
||||||
.parent_queue
|
.parent_lookups
|
||||||
.iter()
|
.iter()
|
||||||
.position(|request| request.pending_response(id))
|
.position(|request| request.pending_response(id))
|
||||||
{
|
{
|
||||||
self.parent_queue.remove(pos)
|
self.parent_lookups.remove(pos)
|
||||||
} else {
|
} else {
|
||||||
if block.is_some() {
|
if block.is_some() {
|
||||||
debug!(self.log, "Response for a parent lookup request that was not found"; "peer_id" => %peer_id);
|
debug!(self.log, "Response for a parent lookup request that was not found"; "peer_id" => %peer_id);
|
||||||
@ -233,13 +265,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
)
|
)
|
||||||
.is_ok()
|
.is_ok()
|
||||||
{
|
{
|
||||||
self.parent_queue.push(parent_lookup)
|
self.parent_lookups.push(parent_lookup)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
// Request finished successfully, nothing else to do. It will be removed after the
|
// Request finished successfully, nothing else to do. It will be removed after the
|
||||||
// processing result arrives.
|
// processing result arrives.
|
||||||
self.parent_queue.push(parent_lookup);
|
self.parent_lookups.push(parent_lookup);
|
||||||
}
|
}
|
||||||
Err(e) => match e {
|
Err(e) => match e {
|
||||||
VerifyError::RootMismatch
|
VerifyError::RootMismatch
|
||||||
@ -276,7 +308,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
|
|
||||||
metrics::set_gauge(
|
metrics::set_gauge(
|
||||||
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
||||||
self.parent_queue.len() as i64,
|
self.parent_lookups.len() as i64,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -324,11 +356,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
|
|
||||||
/* Check disconnection for parent lookups */
|
/* Check disconnection for parent lookups */
|
||||||
while let Some(pos) = self
|
while let Some(pos) = self
|
||||||
.parent_queue
|
.parent_lookups
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.position(|req| req.check_peer_disconnected(peer_id).is_err())
|
.position(|req| req.check_peer_disconnected(peer_id).is_err())
|
||||||
{
|
{
|
||||||
let parent_lookup = self.parent_queue.remove(pos);
|
let parent_lookup = self.parent_lookups.remove(pos);
|
||||||
trace!(self.log, "Parent lookup's peer disconnected"; &parent_lookup);
|
trace!(self.log, "Parent lookup's peer disconnected"; &parent_lookup);
|
||||||
self.request_parent(parent_lookup, cx);
|
self.request_parent(parent_lookup, cx);
|
||||||
}
|
}
|
||||||
@ -342,11 +374,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
cx: &mut SyncNetworkContext<T>,
|
cx: &mut SyncNetworkContext<T>,
|
||||||
) {
|
) {
|
||||||
if let Some(pos) = self
|
if let Some(pos) = self
|
||||||
.parent_queue
|
.parent_lookups
|
||||||
.iter()
|
.iter()
|
||||||
.position(|request| request.pending_response(id))
|
.position(|request| request.pending_response(id))
|
||||||
{
|
{
|
||||||
let mut parent_lookup = self.parent_queue.remove(pos);
|
let mut parent_lookup = self.parent_lookups.remove(pos);
|
||||||
parent_lookup.download_failed();
|
parent_lookup.download_failed();
|
||||||
trace!(self.log, "Parent lookup request failed"; &parent_lookup);
|
trace!(self.log, "Parent lookup request failed"; &parent_lookup);
|
||||||
self.request_parent(parent_lookup, cx);
|
self.request_parent(parent_lookup, cx);
|
||||||
@ -355,7 +387,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
};
|
};
|
||||||
metrics::set_gauge(
|
metrics::set_gauge(
|
||||||
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
||||||
self.parent_queue.len() as i64,
|
self.parent_lookups.len() as i64,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -470,7 +502,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
cx: &mut SyncNetworkContext<T>,
|
cx: &mut SyncNetworkContext<T>,
|
||||||
) {
|
) {
|
||||||
let (mut parent_lookup, peer_id) = if let Some((pos, peer)) = self
|
let (mut parent_lookup, peer_id) = if let Some((pos, peer)) = self
|
||||||
.parent_queue
|
.parent_lookups
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.find_map(|(pos, request)| {
|
.find_map(|(pos, request)| {
|
||||||
@ -478,7 +510,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
.get_processing_peer(chain_hash)
|
.get_processing_peer(chain_hash)
|
||||||
.map(|peer| (pos, peer))
|
.map(|peer| (pos, peer))
|
||||||
}) {
|
}) {
|
||||||
(self.parent_queue.remove(pos), peer)
|
(self.parent_lookups.remove(pos), peer)
|
||||||
} else {
|
} else {
|
||||||
return debug!(self.log, "Process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash);
|
return debug!(self.log, "Process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash);
|
||||||
};
|
};
|
||||||
@ -520,13 +552,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let chain_hash = parent_lookup.chain_hash();
|
let (chain_hash, blocks, hashes, request) = parent_lookup.parts_for_processing();
|
||||||
let blocks = parent_lookup.chain_blocks();
|
|
||||||
let process_id = ChainSegmentProcessId::ParentLookup(chain_hash);
|
let process_id = ChainSegmentProcessId::ParentLookup(chain_hash);
|
||||||
|
|
||||||
match beacon_processor_send.try_send(WorkEvent::chain_segment(process_id, blocks)) {
|
match beacon_processor_send.try_send(WorkEvent::chain_segment(process_id, blocks)) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
self.parent_queue.push(parent_lookup);
|
self.processing_parent_lookups
|
||||||
|
.insert(chain_hash, (hashes, request));
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(
|
error!(
|
||||||
@ -580,7 +612,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
|
|
||||||
metrics::set_gauge(
|
metrics::set_gauge(
|
||||||
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
||||||
self.parent_queue.len() as i64,
|
self.parent_lookups.len() as i64,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -590,14 +622,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
result: BatchProcessResult,
|
result: BatchProcessResult,
|
||||||
cx: &mut SyncNetworkContext<T>,
|
cx: &mut SyncNetworkContext<T>,
|
||||||
) {
|
) {
|
||||||
let parent_lookup = if let Some(pos) = self
|
let request = match self.processing_parent_lookups.remove(&chain_hash) {
|
||||||
.parent_queue
|
Some((_hashes, request)) => request,
|
||||||
.iter()
|
None => {
|
||||||
.position(|request| request.chain_hash() == chain_hash)
|
return debug!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash, "result" => ?result)
|
||||||
{
|
}
|
||||||
self.parent_queue.remove(pos)
|
|
||||||
} else {
|
|
||||||
return debug!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result);
|
debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result);
|
||||||
@ -609,8 +638,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
imported_blocks: _,
|
imported_blocks: _,
|
||||||
penalty,
|
penalty,
|
||||||
} => {
|
} => {
|
||||||
self.failed_chains.insert(parent_lookup.chain_hash());
|
self.failed_chains.insert(chain_hash);
|
||||||
for &peer_id in parent_lookup.used_peers() {
|
for peer_id in request.used_peers {
|
||||||
cx.report_peer(peer_id, penalty, "parent_chain_failure")
|
cx.report_peer(peer_id, penalty, "parent_chain_failure")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -621,7 +650,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
|
|
||||||
metrics::set_gauge(
|
metrics::set_gauge(
|
||||||
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
||||||
self.parent_queue.len() as i64,
|
self.parent_lookups.len() as i64,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -697,14 +726,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
}
|
}
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
debug!(self.log, "Requesting parent"; &parent_lookup);
|
debug!(self.log, "Requesting parent"; &parent_lookup);
|
||||||
self.parent_queue.push(parent_lookup)
|
self.parent_lookups.push(parent_lookup)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We remove and add back again requests so we want this updated regardless of outcome.
|
// We remove and add back again requests so we want this updated regardless of outcome.
|
||||||
metrics::set_gauge(
|
metrics::set_gauge(
|
||||||
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
|
||||||
self.parent_queue.len() as i64,
|
self.parent_lookups.len() as i64,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -715,6 +744,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
|
|
||||||
/// Drops all the parent chain requests and returns how many requests were dropped.
|
/// Drops all the parent chain requests and returns how many requests were dropped.
|
||||||
pub fn drop_parent_chain_requests(&mut self) -> usize {
|
pub fn drop_parent_chain_requests(&mut self) -> usize {
|
||||||
self.parent_queue.drain(..).len()
|
self.parent_lookups.drain(..).len()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,7 @@ pub(crate) struct ParentLookup<T: BeaconChainTypes> {
|
|||||||
/// The root of the block triggering this parent request.
|
/// The root of the block triggering this parent request.
|
||||||
chain_hash: Hash256,
|
chain_hash: Hash256,
|
||||||
/// The blocks that have currently been downloaded.
|
/// The blocks that have currently been downloaded.
|
||||||
downloaded_blocks: Vec<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
downloaded_blocks: Vec<RootBlockTuple<T::EthSpec>>,
|
||||||
/// Request of the last parent.
|
/// Request of the last parent.
|
||||||
current_parent_request: SingleBlockRequest<PARENT_FAIL_TOLERANCE>,
|
current_parent_request: SingleBlockRequest<PARENT_FAIL_TOLERANCE>,
|
||||||
/// Id of the last parent request.
|
/// Id of the last parent request.
|
||||||
@ -53,10 +53,10 @@ pub enum RequestError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> ParentLookup<T> {
|
impl<T: BeaconChainTypes> ParentLookup<T> {
|
||||||
pub fn contains_block(&self, block: &SignedBeaconBlock<T::EthSpec>) -> bool {
|
pub fn contains_block(&self, block_root: &Hash256) -> bool {
|
||||||
self.downloaded_blocks
|
self.downloaded_blocks
|
||||||
.iter()
|
.iter()
|
||||||
.any(|d_block| d_block.as_ref() == block)
|
.any(|(root, _d_block)| root == block_root)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new(
|
pub fn new(
|
||||||
@ -68,7 +68,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
|||||||
|
|
||||||
Self {
|
Self {
|
||||||
chain_hash: block_root,
|
chain_hash: block_root,
|
||||||
downloaded_blocks: vec![block],
|
downloaded_blocks: vec![(block_root, block)],
|
||||||
current_parent_request,
|
current_parent_request,
|
||||||
current_parent_request_id: None,
|
current_parent_request_id: None,
|
||||||
}
|
}
|
||||||
@ -100,7 +100,8 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
|||||||
|
|
||||||
pub fn add_block(&mut self, block: Arc<SignedBeaconBlock<T::EthSpec>>) {
|
pub fn add_block(&mut self, block: Arc<SignedBeaconBlock<T::EthSpec>>) {
|
||||||
let next_parent = block.parent_root();
|
let next_parent = block.parent_root();
|
||||||
self.downloaded_blocks.push(block);
|
let current_root = self.current_parent_request.hash;
|
||||||
|
self.downloaded_blocks.push((current_root, block));
|
||||||
self.current_parent_request.hash = next_parent;
|
self.current_parent_request.hash = next_parent;
|
||||||
self.current_parent_request.state = single_block_lookup::State::AwaitingDownload;
|
self.current_parent_request.state = single_block_lookup::State::AwaitingDownload;
|
||||||
self.current_parent_request_id = None;
|
self.current_parent_request_id = None;
|
||||||
@ -110,6 +111,32 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
|||||||
self.current_parent_request_id == Some(req_id)
|
self.current_parent_request_id == Some(req_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Consumes the parent request and destructures it into it's parts.
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
|
pub fn parts_for_processing(
|
||||||
|
self,
|
||||||
|
) -> (
|
||||||
|
Hash256,
|
||||||
|
Vec<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||||
|
Vec<Hash256>,
|
||||||
|
SingleBlockRequest<PARENT_FAIL_TOLERANCE>,
|
||||||
|
) {
|
||||||
|
let ParentLookup {
|
||||||
|
chain_hash,
|
||||||
|
downloaded_blocks,
|
||||||
|
current_parent_request,
|
||||||
|
current_parent_request_id: _,
|
||||||
|
} = self;
|
||||||
|
let block_count = downloaded_blocks.len();
|
||||||
|
let mut blocks = Vec::with_capacity(block_count);
|
||||||
|
let mut hashes = Vec::with_capacity(block_count);
|
||||||
|
for (hash, block) in downloaded_blocks {
|
||||||
|
blocks.push(block);
|
||||||
|
hashes.push(hash);
|
||||||
|
}
|
||||||
|
(chain_hash, blocks, hashes, current_parent_request)
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the parent lookup's chain hash.
|
/// Get the parent lookup's chain hash.
|
||||||
pub fn chain_hash(&self) -> Hash256 {
|
pub fn chain_hash(&self) -> Hash256 {
|
||||||
self.chain_hash
|
self.chain_hash
|
||||||
@ -125,10 +152,6 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
|||||||
self.current_parent_request_id = None;
|
self.current_parent_request_id = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn chain_blocks(&mut self) -> Vec<Arc<SignedBeaconBlock<T::EthSpec>>> {
|
|
||||||
std::mem::take(&mut self.downloaded_blocks)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Verifies that the received block is what we requested. If so, parent lookup now waits for
|
/// Verifies that the received block is what we requested. If so, parent lookup now waits for
|
||||||
/// the processing result of the block.
|
/// the processing result of the block.
|
||||||
pub fn verify_block(
|
pub fn verify_block(
|
||||||
|
@ -259,7 +259,7 @@ fn test_single_block_lookup_becomes_parent_request() {
|
|||||||
assert_eq!(bl.single_block_lookups.len(), 0);
|
assert_eq!(bl.single_block_lookups.len(), 0);
|
||||||
rig.expect_parent_request();
|
rig.expect_parent_request();
|
||||||
rig.expect_empty_network();
|
rig.expect_empty_network();
|
||||||
assert_eq!(bl.parent_queue.len(), 1);
|
assert_eq!(bl.parent_lookups.len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -287,7 +287,7 @@ fn test_parent_lookup_happy_path() {
|
|||||||
was_non_empty: true,
|
was_non_empty: true,
|
||||||
};
|
};
|
||||||
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
|
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
|
||||||
assert_eq!(bl.parent_queue.len(), 0);
|
assert_eq!(bl.parent_lookups.len(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -324,7 +324,7 @@ fn test_parent_lookup_wrong_response() {
|
|||||||
was_non_empty: true,
|
was_non_empty: true,
|
||||||
};
|
};
|
||||||
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
|
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
|
||||||
assert_eq!(bl.parent_queue.len(), 0);
|
assert_eq!(bl.parent_lookups.len(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -356,7 +356,7 @@ fn test_parent_lookup_empty_response() {
|
|||||||
was_non_empty: true,
|
was_non_empty: true,
|
||||||
};
|
};
|
||||||
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
|
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
|
||||||
assert_eq!(bl.parent_queue.len(), 0);
|
assert_eq!(bl.parent_lookups.len(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -387,7 +387,7 @@ fn test_parent_lookup_rpc_failure() {
|
|||||||
was_non_empty: true,
|
was_non_empty: true,
|
||||||
};
|
};
|
||||||
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
|
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
|
||||||
assert_eq!(bl.parent_queue.len(), 0);
|
assert_eq!(bl.parent_lookups.len(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -419,11 +419,11 @@ fn test_parent_lookup_too_many_attempts() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if i < parent_lookup::PARENT_FAIL_TOLERANCE {
|
if i < parent_lookup::PARENT_FAIL_TOLERANCE {
|
||||||
assert_eq!(bl.parent_queue[0].failed_attempts(), dbg!(i));
|
assert_eq!(bl.parent_lookups[0].failed_attempts(), dbg!(i));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_eq!(bl.parent_queue.len(), 0);
|
assert_eq!(bl.parent_lookups.len(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -450,11 +450,11 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() {
|
|||||||
rig.expect_penalty();
|
rig.expect_penalty();
|
||||||
}
|
}
|
||||||
if i < parent_lookup::PARENT_FAIL_TOLERANCE {
|
if i < parent_lookup::PARENT_FAIL_TOLERANCE {
|
||||||
assert_eq!(bl.parent_queue[0].failed_attempts(), dbg!(i));
|
assert_eq!(bl.parent_lookups[0].failed_attempts(), dbg!(i));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_eq!(bl.parent_queue.len(), 0);
|
assert_eq!(bl.parent_lookups.len(), 0);
|
||||||
assert!(!bl.failed_chains.contains(&block_hash));
|
assert!(!bl.failed_chains.contains(&block_hash));
|
||||||
assert!(!bl.failed_chains.contains(&parent.canonical_root()));
|
assert!(!bl.failed_chains.contains(&parent.canonical_root()));
|
||||||
}
|
}
|
||||||
@ -491,7 +491,7 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
assert!(bl.failed_chains.contains(&block_hash));
|
assert!(bl.failed_chains.contains(&block_hash));
|
||||||
assert_eq!(bl.parent_queue.len(), 0);
|
assert_eq!(bl.parent_lookups.len(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -545,7 +545,7 @@ fn test_parent_lookup_disconnection() {
|
|||||||
&mut cx,
|
&mut cx,
|
||||||
);
|
);
|
||||||
bl.peer_disconnected(&peer_id, &mut cx);
|
bl.peer_disconnected(&peer_id, &mut cx);
|
||||||
assert!(bl.parent_queue.is_empty());
|
assert!(bl.parent_lookups.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -598,5 +598,78 @@ fn test_parent_lookup_ignored_response() {
|
|||||||
// Return an Ignored result. The request should be dropped
|
// Return an Ignored result. The request should be dropped
|
||||||
bl.parent_block_processed(chain_hash, BlockProcessResult::Ignored, &mut cx);
|
bl.parent_block_processed(chain_hash, BlockProcessResult::Ignored, &mut cx);
|
||||||
rig.expect_empty_network();
|
rig.expect_empty_network();
|
||||||
assert_eq!(bl.parent_queue.len(), 0);
|
assert_eq!(bl.parent_lookups.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This is a regression test.
|
||||||
|
#[test]
|
||||||
|
fn test_same_chain_race_condition() {
|
||||||
|
let (mut bl, mut cx, mut rig) = TestRig::test_setup(Some(Level::Debug));
|
||||||
|
|
||||||
|
#[track_caller]
|
||||||
|
fn parent_lookups_consistency(bl: &BlockLookups<T>) {
|
||||||
|
let hashes: Vec<_> = bl
|
||||||
|
.parent_lookups
|
||||||
|
.iter()
|
||||||
|
.map(|req| req.chain_hash())
|
||||||
|
.collect();
|
||||||
|
let expected = hashes.len();
|
||||||
|
assert_eq!(
|
||||||
|
expected,
|
||||||
|
hashes
|
||||||
|
.into_iter()
|
||||||
|
.collect::<std::collections::HashSet<_>>()
|
||||||
|
.len(),
|
||||||
|
"duplicated chain hashes in parent queue"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
// if we use one or two blocks it will match on the hash or the parent hash, so make a longer
|
||||||
|
// chain.
|
||||||
|
let depth = 4;
|
||||||
|
let mut blocks = Vec::<Arc<SignedBeaconBlock<E>>>::with_capacity(depth);
|
||||||
|
while blocks.len() < depth {
|
||||||
|
let parent = blocks
|
||||||
|
.last()
|
||||||
|
.map(|b| b.canonical_root())
|
||||||
|
.unwrap_or_else(Hash256::random);
|
||||||
|
let block = Arc::new(rig.block_with_parent(parent));
|
||||||
|
blocks.push(block);
|
||||||
|
}
|
||||||
|
|
||||||
|
let peer_id = PeerId::random();
|
||||||
|
let trigger_block = blocks.pop().unwrap();
|
||||||
|
let chain_hash = trigger_block.canonical_root();
|
||||||
|
bl.search_parent(chain_hash, trigger_block.clone(), peer_id, &mut cx);
|
||||||
|
|
||||||
|
for (i, block) in blocks.into_iter().rev().enumerate() {
|
||||||
|
let id = rig.expect_parent_request();
|
||||||
|
// the block
|
||||||
|
bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx);
|
||||||
|
// the stream termination
|
||||||
|
bl.parent_lookup_response(id, peer_id, None, D, &mut cx);
|
||||||
|
// the processing request
|
||||||
|
rig.expect_block_process();
|
||||||
|
// the processing result
|
||||||
|
if i + 2 == depth {
|
||||||
|
// one block was removed
|
||||||
|
bl.parent_block_processed(chain_hash, BlockError::BlockIsAlreadyKnown.into(), &mut cx)
|
||||||
|
} else {
|
||||||
|
bl.parent_block_processed(chain_hash, BlockError::ParentUnknown(block).into(), &mut cx)
|
||||||
|
}
|
||||||
|
parent_lookups_consistency(&bl)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Processing succeeds, now the rest of the chain should be sent for processing.
|
||||||
|
rig.expect_parent_chain_process();
|
||||||
|
|
||||||
|
// Try to get this block again while the chain is being processed. We should not request it again.
|
||||||
|
let peer_id = PeerId::random();
|
||||||
|
bl.search_parent(chain_hash, trigger_block, peer_id, &mut cx);
|
||||||
|
parent_lookups_consistency(&bl);
|
||||||
|
|
||||||
|
let process_result = BatchProcessResult::Success {
|
||||||
|
was_non_empty: true,
|
||||||
|
};
|
||||||
|
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
|
||||||
|
assert_eq!(bl.parent_lookups.len(), 0);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user