Improve error handling in network processing (#1654)
* Improve error handling in network processing * Cargo fmt * Cargo fmt * Improve error handling for prior genesis * Remove dep
This commit is contained in:
parent
113758a4f5
commit
bcb629564a
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -1612,6 +1612,7 @@ dependencies = [
|
||||
"slog-async",
|
||||
"slog-stdlog",
|
||||
"slog-term",
|
||||
"slot_clock",
|
||||
"smallvec 1.4.2",
|
||||
"snap",
|
||||
"tempdir",
|
||||
|
@ -3,13 +3,14 @@ use crate::beacon_processor::{
|
||||
};
|
||||
use crate::service::NetworkMessage;
|
||||
use crate::sync::{PeerSyncInfo, SyncMessage};
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||
use eth2_libp2p::rpc::*;
|
||||
use eth2_libp2p::{
|
||||
MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response,
|
||||
};
|
||||
use itertools::process_results;
|
||||
use slog::{debug, error, o, trace, warn};
|
||||
use slot_clock::SlotClock;
|
||||
use std::cmp;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
@ -158,7 +159,9 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
);
|
||||
}
|
||||
|
||||
self.process_status(peer_id, status);
|
||||
if let Err(e) = self.process_status(peer_id, status) {
|
||||
error!(self.log, "Could not process status message"; "error" => format!("{:?}", e));
|
||||
}
|
||||
}
|
||||
|
||||
/// Process a `Status` response from a peer.
|
||||
@ -175,22 +178,29 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
);
|
||||
|
||||
// Process the status message, without sending back another status.
|
||||
self.process_status(peer_id, status);
|
||||
if let Err(e) = self.process_status(peer_id, status) {
|
||||
error!(self.log, "Could not process status message"; "error" => format!("{:?}", e));
|
||||
}
|
||||
}
|
||||
|
||||
/// Process a `Status` message, requesting new blocks if appropriate.
|
||||
///
|
||||
/// Disconnects the peer if required.
|
||||
fn process_status(&mut self, peer_id: PeerId, status: StatusMessage) {
|
||||
fn process_status(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
status: StatusMessage,
|
||||
) -> Result<(), BeaconChainError> {
|
||||
let remote = PeerSyncInfo::from(status);
|
||||
let local = match PeerSyncInfo::from_chain(&self.chain) {
|
||||
Some(local) => local,
|
||||
None => {
|
||||
return error!(
|
||||
error!(
|
||||
self.log,
|
||||
"Failed to get peer sync info";
|
||||
"msg" => "likely due to head lock contention"
|
||||
)
|
||||
);
|
||||
return Err(BeaconChainError::CannotAttestToFutureState);
|
||||
}
|
||||
};
|
||||
|
||||
@ -209,7 +219,11 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
self.network
|
||||
.goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork);
|
||||
} else if remote.head_slot
|
||||
> self.chain.slot().unwrap_or_else(|_| Slot::from(0u64)) + FUTURE_SLOT_TOLERANCE
|
||||
> self
|
||||
.chain
|
||||
.slot()
|
||||
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot())
|
||||
+ FUTURE_SLOT_TOLERANCE
|
||||
{
|
||||
// Note: If the slot_clock cannot be read, this will not error. Other system
|
||||
// components will deal with an invalid slot clock error.
|
||||
@ -230,8 +244,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
&& self
|
||||
.chain
|
||||
.root_at_slot(start_slot(remote.finalized_epoch))
|
||||
.map(|root_opt| root_opt != Some(remote.finalized_root))
|
||||
.unwrap_or_else(|_| false)
|
||||
.map(|root_opt| root_opt != Some(remote.finalized_root))?
|
||||
{
|
||||
// The remotes finalized epoch is less than or greater than ours, but the block root is
|
||||
// different to the one in our chain.
|
||||
@ -267,8 +280,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
} else if self
|
||||
.chain
|
||||
.store
|
||||
.item_exists::<SignedBeaconBlock<T::EthSpec>>(&remote.head_root)
|
||||
.unwrap_or_else(|_| false)
|
||||
.item_exists::<SignedBeaconBlock<T::EthSpec>>(&remote.head_root)?
|
||||
{
|
||||
debug!(
|
||||
self.log, "Peer with known chain found";
|
||||
@ -293,6 +305,8 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
);
|
||||
self.send_to_sync(SyncMessage::AddPeer(peer_id, remote));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle a `BlocksByRoot` request from the peer.
|
||||
@ -438,6 +452,11 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
}
|
||||
}
|
||||
|
||||
let current_slot = self
|
||||
.chain
|
||||
.slot()
|
||||
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
|
||||
|
||||
if blocks_sent < (req.count as usize) {
|
||||
debug!(
|
||||
self.log,
|
||||
@ -445,7 +464,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
"peer" => peer_id.to_string(),
|
||||
"msg" => "Failed to return all requested blocks",
|
||||
"start_slot" => req.start_slot,
|
||||
"current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(),
|
||||
"current_slot" => current_slot,
|
||||
"requested" => req.count,
|
||||
"returned" => blocks_sent);
|
||||
} else {
|
||||
@ -454,7 +473,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
"Sending BlocksByRange Response";
|
||||
"peer" => peer_id.to_string(),
|
||||
"start_slot" => req.start_slot,
|
||||
"current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(),
|
||||
"current_slot" => current_slot,
|
||||
"requested" => req.count,
|
||||
"returned" => blocks_sent);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user