Improve block-lookup functionality (#3287)

Improves some of the functionality around single and parent block lookup. 

Gives extra information about whether failures for lookups are related to processing or downloading.

This is entirely untested.


Co-authored-by: Diva M <divma@protonmail.com>
This commit is contained in:
Age Manning 2022-07-17 23:26:58 +00:00
parent 4f58c555a9
commit 2ed51c364d
4 changed files with 186 additions and 44 deletions

View File

@ -69,6 +69,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/* Lookup requests */
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
/// constructed.
pub fn search_block(
&mut self,
hash: Hash256,
@ -105,6 +107,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
/// If a block is attempted to be processed but we do not know its parent, this function is
/// called in order to find the block's parent.
pub fn search_parent(
&mut self,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
@ -201,6 +205,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
}
/// Process a response received from a parent lookup request.
pub fn parent_lookup_response(
&mut self,
id: Id,
@ -258,7 +263,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.request_parent(parent_lookup, cx);
}
VerifyError::PreviousFailure { parent_root } => {
self.failed_chains.insert(parent_lookup.chain_hash());
debug!(
self.log,
"Parent chain ignored due to past failure";
@ -336,6 +340,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
/// An RPC error has occurred during a parent lookup. This function handles this case.
pub fn parent_lookup_failed(
&mut self,
id: Id,
@ -362,7 +367,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn single_block_lookup_failed(&mut self, id: Id, cx: &mut SyncNetworkContext<T::EthSpec>) {
if let Some(mut request) = self.single_block_lookups.remove(&id) {
request.register_failure();
request.register_failure_downloading();
trace!(self.log, "Single block lookup failed"; "block" => %request.hash);
if let Ok((peer_id, block_request)) = request.request_block() {
if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) {
@ -453,7 +458,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"single_block_failure",
);
// Try it again if possible.
req.register_failure();
req.register_failure_processing();
if let Ok((peer_id, request)) = req.request_block() {
if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request)
{
@ -569,12 +574,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"last_peer" => %peer_id,
);
// Add this chain to cache of failed chains
self.failed_chains.insert(chain_hash);
// This currently can be a host of errors. We permit this due to the partial
// ambiguity.
cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err");
// Try again if possible
parent_lookup.processing_failed();
self.request_parent(parent_lookup, cx);
}
BlockProcessResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
@ -683,14 +689,26 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
parent_lookup::RequestError::SendFailed(_) => {
// Probably shutting down, nothing to do here. Drop the request
}
parent_lookup::RequestError::ChainTooLong
| parent_lookup::RequestError::TooManyAttempts => {
parent_lookup::RequestError::ChainTooLong => {
self.failed_chains.insert(parent_lookup.chain_hash());
// This indicates faulty peers.
for &peer_id in parent_lookup.used_peers() {
cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static())
}
}
parent_lookup::RequestError::TooManyAttempts { cannot_process } => {
// We only consider the chain failed if we were unable to process it.
// We could have failed because one peer continually failed to send us
// bad blocks. We still allow other peers to send us this chain. Note
// that peers that do this, still get penalised.
if cannot_process {
self.failed_chains.insert(parent_lookup.chain_hash());
}
// This indicates faulty peers.
for &peer_id in parent_lookup.used_peers() {
cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static())
}
}
parent_lookup::RequestError::NoPeers => {
// This happens if the peer disconnects while the block is being
// processed. Drop the request without extra penalty

View File

@ -10,7 +10,7 @@ use crate::sync::{
use super::single_block_lookup::{self, SingleBlockRequest};
/// How many attempts we try to find a parent of a block before we give up trying .
/// How many attempts we try to find a parent of a block before we give up trying.
pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5;
/// The maximum depth we will search for a parent block. In principle we should have sync'd any
/// canonical chain to its head once the peer connects. A chain should not appear where it's depth
@ -41,7 +41,12 @@ pub enum VerifyError {
pub enum RequestError {
SendFailed(&'static str),
ChainTooLong,
TooManyAttempts,
/// We witnessed too many failures trying to complete this parent lookup.
TooManyAttempts {
/// We received more failures trying to process the blocks than downloading them
/// from peers.
cannot_process: bool,
},
NoPeers,
}
@ -105,7 +110,12 @@ impl<T: EthSpec> ParentLookup<T> {
}
pub fn download_failed(&mut self) {
self.current_parent_request.register_failure();
self.current_parent_request.register_failure_downloading();
self.current_parent_request_id = None;
}
pub fn processing_failed(&mut self) {
self.current_parent_request.register_failure_processing();
self.current_parent_request_id = None;
}
@ -126,7 +136,7 @@ impl<T: EthSpec> ParentLookup<T> {
// be dropped and the peer downscored.
if let Some(parent_root) = block.as_ref().map(|block| block.parent_root()) {
if failed_chains.contains(&parent_root) {
self.current_parent_request.register_failure();
self.current_parent_request.register_failure_downloading();
self.current_parent_request_id = None;
return Err(VerifyError::PreviousFailure { parent_root });
}
@ -144,7 +154,7 @@ impl<T: EthSpec> ParentLookup<T> {
#[cfg(test)]
pub fn failed_attempts(&self) -> u8 {
self.current_parent_request.failed_attempts
self.current_parent_request.failed_attempts()
}
pub fn add_peer(&mut self, block_root: &Hash256, peer_id: &PeerId) -> bool {
@ -171,7 +181,9 @@ impl From<super::single_block_lookup::LookupRequestError> for RequestError {
fn from(e: super::single_block_lookup::LookupRequestError) -> Self {
use super::single_block_lookup::LookupRequestError as E;
match e {
E::TooManyAttempts => RequestError::TooManyAttempts,
E::TooManyAttempts { cannot_process } => {
RequestError::TooManyAttempts { cannot_process }
}
E::NoPeers => RequestError::NoPeers,
}
}
@ -195,7 +207,10 @@ impl RequestError {
match self {
RequestError::SendFailed(e) => e,
RequestError::ChainTooLong => "chain_too_long",
RequestError::TooManyAttempts => "too_many_attempts",
RequestError::TooManyAttempts { cannot_process } if *cannot_process => {
"too_many_processing_attempts"
}
RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts",
RequestError::NoPeers => "no_peers",
}
}

View File

@ -18,8 +18,10 @@ pub struct SingleBlockRequest<const MAX_ATTEMPTS: u8> {
pub available_peers: HashSet<PeerId>,
/// Peers from which we have requested this block.
pub used_peers: HashSet<PeerId>,
/// How many times have we attempted this block.
pub failed_attempts: u8,
/// How many times have we attempted to process this block.
failed_processing: u8,
/// How many times have we attempted to download this block.
failed_downloading: u8,
}
#[derive(Debug, PartialEq, Eq)]
@ -38,7 +40,11 @@ pub enum VerifyError {
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum LookupRequestError {
TooManyAttempts,
/// Too many failed attempts
TooManyAttempts {
/// The failed attempts were primarily due to processing failures.
cannot_process: bool,
},
NoPeers,
}
@ -49,15 +55,29 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
state: State::AwaitingDownload,
available_peers: HashSet::from([peer_id]),
used_peers: HashSet::default(),
failed_attempts: 0,
failed_processing: 0,
failed_downloading: 0,
}
}
pub fn register_failure(&mut self) {
self.failed_attempts += 1;
/// Registers a failure in processing a block.
pub fn register_failure_processing(&mut self) {
self.failed_processing = self.failed_processing.saturating_add(1);
self.state = State::AwaitingDownload;
}
/// Registers a failure in downloading a block. This might be a peer disconnection or a wrong
/// block.
pub fn register_failure_downloading(&mut self) {
self.failed_downloading = self.failed_downloading.saturating_add(1);
self.state = State::AwaitingDownload;
}
/// The total number of failures, whether it be processing or downloading.
pub fn failed_attempts(&self) -> u8 {
self.failed_processing + self.failed_downloading
}
pub fn add_peer(&mut self, hash: &Hash256, peer_id: &PeerId) -> bool {
let is_useful = &self.hash == hash;
if is_useful {
@ -72,7 +92,7 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
if let State::Downloading { peer_id } = &self.state {
if peer_id == dc_peer_id {
// Peer disconnected before providing a block
self.register_failure();
self.register_failure_downloading();
return Err(());
}
}
@ -87,14 +107,16 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
) -> Result<Option<Arc<SignedBeaconBlock<T>>>, VerifyError> {
match self.state {
State::AwaitingDownload => {
self.register_failure();
self.register_failure_downloading();
Err(VerifyError::ExtraBlocksReturned)
}
State::Downloading { peer_id } => match block {
Some(block) => {
if block.canonical_root() != self.hash {
// return an error and drop the block
self.register_failure();
// NOTE: we take this is as a download failure to prevent counting the
// attempt as a chain failure, but simply a peer failure.
self.register_failure_downloading();
Err(VerifyError::RootMismatch)
} else {
// Return the block for processing.
@ -103,14 +125,14 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
}
}
None => {
self.register_failure();
self.register_failure_downloading();
Err(VerifyError::NoBlockReturned)
}
},
State::Processing { peer_id: _ } => match block {
Some(_) => {
// We sent the block for processing and received an extra block.
self.register_failure();
self.register_failure_downloading();
Err(VerifyError::ExtraBlocksReturned)
}
None => {
@ -124,19 +146,19 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
pub fn request_block(&mut self) -> Result<(PeerId, BlocksByRootRequest), LookupRequestError> {
debug_assert!(matches!(self.state, State::AwaitingDownload));
if self.failed_attempts <= MAX_ATTEMPTS {
if let Some(&peer_id) = self.available_peers.iter().choose(&mut rand::thread_rng()) {
let request = BlocksByRootRequest {
block_roots: VariableList::from(vec![self.hash]),
};
self.state = State::Downloading { peer_id };
self.used_peers.insert(peer_id);
Ok((peer_id, request))
} else {
Err(LookupRequestError::NoPeers)
}
if self.failed_attempts() >= MAX_ATTEMPTS {
Err(LookupRequestError::TooManyAttempts {
cannot_process: self.failed_processing >= self.failed_downloading,
})
} else if let Some(&peer_id) = self.available_peers.iter().choose(&mut rand::thread_rng()) {
let request = BlocksByRootRequest {
block_roots: VariableList::from(vec![self.hash]),
};
self.state = State::Downloading { peer_id };
self.used_peers.insert(peer_id);
Ok((peer_id, request))
} else {
Err(LookupRequestError::TooManyAttempts)
Err(LookupRequestError::NoPeers)
}
}
@ -169,6 +191,8 @@ impl<const MAX_ATTEMPTS: u8> slog::Value for SingleBlockRequest<MAX_ATTEMPTS> {
serializer.emit_arguments("processing_peer", &format_args!("{}", peer_id))?
}
}
serializer.emit_u8("failed_downloads", self.failed_downloading)?;
serializer.emit_u8("failed_processing", self.failed_processing)?;
slog::Result::Ok(())
}
}
@ -200,11 +224,28 @@ mod tests {
}
#[test]
fn test_max_attempts() {
fn test_block_lookup_failures() {
const FAILURES: u8 = 3;
let peer_id = PeerId::random();
let block = rand_block();
let mut sl = SingleBlockRequest::<4>::new(block.canonical_root(), peer_id);
sl.register_failure();
let mut sl = SingleBlockRequest::<FAILURES>::new(block.canonical_root(), peer_id);
for _ in 1..FAILURES {
sl.request_block().unwrap();
sl.register_failure_downloading();
}
// Now we receive the block and send it for processing
sl.request_block().unwrap();
sl.verify_block(Some(Arc::new(block))).unwrap().unwrap();
// One processing failure maxes the available attempts
sl.register_failure_processing();
assert_eq!(
sl.request_block(),
Err(LookupRequestError::TooManyAttempts {
cannot_process: false
})
)
}
}

View File

@ -385,12 +385,11 @@ fn test_parent_lookup_too_many_attempts() {
let parent = rig.rand_block();
let block = rig.block_with_parent(parent.canonical_root());
let chain_hash = block.canonical_root();
let peer_id = PeerId::random();
// Trigger the request
bl.search_parent(Arc::new(block), peer_id, &mut cx);
for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE + 1 {
for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE {
let id = rig.expect_parent_request();
match i % 2 {
// make sure every error is accounted for
@ -402,6 +401,8 @@ fn test_parent_lookup_too_many_attempts() {
// Send a bad block this time. It should be tried again.
let bad_block = rig.rand_block();
bl.parent_lookup_response(id, peer_id, Some(Arc::new(bad_block)), D, &mut cx);
// Send the stream termination
bl.parent_lookup_response(id, peer_id, None, D, &mut cx);
rig.expect_penalty();
}
}
@ -411,7 +412,74 @@ fn test_parent_lookup_too_many_attempts() {
}
assert_eq!(bl.parent_queue.len(), 0);
assert!(bl.failed_chains.contains(&chain_hash));
}
#[test]
fn test_parent_lookup_too_many_download_attempts_no_blacklist() {
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
let parent = rig.rand_block();
let block = rig.block_with_parent(parent.canonical_root());
let block_hash = block.canonical_root();
let peer_id = PeerId::random();
// Trigger the request
bl.search_parent(Arc::new(block), peer_id, &mut cx);
for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE {
assert!(!bl.failed_chains.contains(&block_hash));
let id = rig.expect_parent_request();
if i % 2 != 0 {
// The request fails. It should be tried again.
bl.parent_lookup_failed(id, peer_id, &mut cx);
} else {
// Send a bad block this time. It should be tried again.
let bad_block = rig.rand_block();
bl.parent_lookup_response(id, peer_id, Some(Arc::new(bad_block)), D, &mut cx);
rig.expect_penalty();
}
if i < parent_lookup::PARENT_FAIL_TOLERANCE {
assert_eq!(bl.parent_queue[0].failed_attempts(), dbg!(i));
}
}
assert_eq!(bl.parent_queue.len(), 0);
assert!(!bl.failed_chains.contains(&block_hash));
assert!(!bl.failed_chains.contains(&parent.canonical_root()));
}
#[test]
fn test_parent_lookup_too_many_processing_attempts_must_blacklist() {
const PROCESSING_FAILURES: u8 = parent_lookup::PARENT_FAIL_TOLERANCE / 2 + 1;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
let parent = Arc::new(rig.rand_block());
let block = rig.block_with_parent(parent.canonical_root());
let block_hash = block.canonical_root();
let peer_id = PeerId::random();
// Trigger the request
bl.search_parent(Arc::new(block), peer_id, &mut cx);
// Fail downloading the block
for _ in 0..(parent_lookup::PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) {
let id = rig.expect_parent_request();
// The request fails. It should be tried again.
bl.parent_lookup_failed(id, peer_id, &mut cx);
}
// Now fail processing a block in the parent request
for _ in 0..PROCESSING_FAILURES {
let id = dbg!(rig.expect_parent_request());
assert!(!bl.failed_chains.contains(&block_hash));
// send the right parent but fail processing
bl.parent_lookup_response(id, peer_id, Some(parent.clone()), D, &mut cx);
bl.parent_block_processed(block_hash, BlockError::InvalidSignature.into(), &mut cx);
bl.parent_lookup_response(id, peer_id, None, D, &mut cx);
rig.expect_penalty();
}
assert!(bl.failed_chains.contains(&block_hash));
assert_eq!(bl.parent_queue.len(), 0);
}
#[test]