Improve error handling in network processing (#1654)

* Improve error handling in network processing

* Cargo fmt

* Cargo fmt

* Improve error handling for prior genesis

* Remove dep
This commit is contained in:
Age Manning 2020-10-05 17:30:43 +11:00 committed by GitHub
parent b1c121b880
commit fe07a3c21c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 13 deletions

1
Cargo.lock generated
View File

@ -1562,6 +1562,7 @@ dependencies = [
"slog-async",
"slog-stdlog",
"slog-term",
"slot_clock",
"smallvec 1.4.2",
"snap",
"tempdir",

View File

@ -3,13 +3,14 @@ use crate::beacon_processor::{
};
use crate::service::NetworkMessage;
use crate::sync::{PeerSyncInfo, SyncMessage};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2_libp2p::rpc::*;
use eth2_libp2p::{
MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response,
};
use itertools::process_results;
use slog::{debug, error, o, trace, warn};
use slot_clock::SlotClock;
use std::cmp;
use std::sync::Arc;
use tokio::sync::mpsc;
@ -158,7 +159,9 @@ impl<T: BeaconChainTypes> Processor<T> {
);
}
self.process_status(peer_id, status);
if let Err(e) = self.process_status(peer_id, status) {
error!(self.log, "Could not process status message"; "error" => format!("{:?}", e));
}
}
/// Process a `Status` response from a peer.
@ -175,22 +178,29 @@ impl<T: BeaconChainTypes> Processor<T> {
);
// Process the status message, without sending back another status.
self.process_status(peer_id, status);
if let Err(e) = self.process_status(peer_id, status) {
error!(self.log, "Could not process status message"; "error" => format!("{:?}", e));
}
}
/// Process a `Status` message, requesting new blocks if appropriate.
///
/// Disconnects the peer if required.
fn process_status(&mut self, peer_id: PeerId, status: StatusMessage) {
fn process_status(
&mut self,
peer_id: PeerId,
status: StatusMessage,
) -> Result<(), BeaconChainError> {
let remote = PeerSyncInfo::from(status);
let local = match PeerSyncInfo::from_chain(&self.chain) {
Some(local) => local,
None => {
return error!(
error!(
self.log,
"Failed to get peer sync info";
"msg" => "likely due to head lock contention"
)
);
return Err(BeaconChainError::CannotAttestToFutureState);
}
};
@ -209,7 +219,11 @@ impl<T: BeaconChainTypes> Processor<T> {
self.network
.goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork);
} else if remote.head_slot
> self.chain.slot().unwrap_or_else(|_| Slot::from(0u64)) + FUTURE_SLOT_TOLERANCE
> self
.chain
.slot()
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot())
+ FUTURE_SLOT_TOLERANCE
{
// Note: If the slot_clock cannot be read, this will not error. Other system
// components will deal with an invalid slot clock error.
@ -230,8 +244,7 @@ impl<T: BeaconChainTypes> Processor<T> {
&& self
.chain
.root_at_slot(start_slot(remote.finalized_epoch))
.map(|root_opt| root_opt != Some(remote.finalized_root))
.unwrap_or_else(|_| false)
.map(|root_opt| root_opt != Some(remote.finalized_root))?
{
// The remotes finalized epoch is less than or greater than ours, but the block root is
// different to the one in our chain.
@ -267,8 +280,7 @@ impl<T: BeaconChainTypes> Processor<T> {
} else if self
.chain
.store
.item_exists::<SignedBeaconBlock<T::EthSpec>>(&remote.head_root)
.unwrap_or_else(|_| false)
.item_exists::<SignedBeaconBlock<T::EthSpec>>(&remote.head_root)?
{
debug!(
self.log, "Peer with known chain found";
@ -293,6 +305,8 @@ impl<T: BeaconChainTypes> Processor<T> {
);
self.send_to_sync(SyncMessage::AddPeer(peer_id, remote));
}
Ok(())
}
/// Handle a `BlocksByRoot` request from the peer.
@ -438,6 +452,11 @@ impl<T: BeaconChainTypes> Processor<T> {
}
}
let current_slot = self
.chain
.slot()
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
if blocks_sent < (req.count as usize) {
debug!(
self.log,
@ -445,7 +464,7 @@ impl<T: BeaconChainTypes> Processor<T> {
"peer" => peer_id.to_string(),
"msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot,
"current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(),
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blocks_sent);
} else {
@ -454,7 +473,7 @@ impl<T: BeaconChainTypes> Processor<T> {
"Sending BlocksByRange Response";
"peer" => peer_id.to_string(),
"start_slot" => req.start_slot,
"current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(),
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blocks_sent);
}