Simplify simple_sync code

This commit is contained in:
Paul Hauner 2019-06-06 00:32:09 -04:00
parent f52d66a136
commit af96dd08c8
No known key found for this signature in database
GPG Key ID: 303E4494BB28068C

View File

@ -1,8 +1,6 @@
use super::import_queue::ImportQueue; use super::import_queue::ImportQueue;
use crate::message_handler::NetworkContext; use crate::message_handler::NetworkContext;
use beacon_chain::{ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome, InvalidBlock};
BeaconChain, BeaconChainError, BeaconChainTypes, BlockProcessingOutcome, InvalidBlock,
};
use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::PeerId; use eth2_libp2p::PeerId;
@ -33,51 +31,6 @@ pub struct PeerSyncInfo {
best_slot: Slot, best_slot: Slot,
} }
impl PeerSyncInfo {
/// Returns `true` if the has a different network ID to `other`.
fn has_different_network_id_to(&self, other: Self) -> bool {
self.network_id != other.network_id
}
/// Returns `true` if the peer has a higher finalized epoch than `other`.
fn has_higher_finalized_epoch_than(&self, other: Self) -> bool {
self.latest_finalized_epoch > other.latest_finalized_epoch
}
/// Returns `true` if the peer has a higher best slot than `other`.
fn has_higher_best_slot_than(&self, other: Self) -> bool {
self.best_slot > other.best_slot
}
}
/// The status of a peers view on the chain, relative to some other view of the chain (presumably
/// our view).
#[derive(PartialEq, Clone, Copy, Debug)]
pub enum PeerStatus {
/// The peer is on a completely different chain.
DifferentNetworkId,
/// The peer lists a finalized epoch for which we have a different root.
FinalizedEpochNotInChain,
/// The peer has a higher finalized epoch.
HigherFinalizedEpoch,
/// The peer has a higher best slot.
HigherBestSlot,
/// The peer has the same or lesser view of the chain. We have nothing to request of them.
NotInteresting,
}
impl PeerStatus {
pub fn should_handshake(self) -> bool {
match self {
PeerStatus::DifferentNetworkId => false,
PeerStatus::FinalizedEpochNotInChain => false,
PeerStatus::HigherFinalizedEpoch => true,
PeerStatus::HigherBestSlot => true,
PeerStatus::NotInteresting => true,
}
}
}
impl From<HelloMessage> for PeerSyncInfo { impl From<HelloMessage> for PeerSyncInfo {
fn from(hello: HelloMessage) -> PeerSyncInfo { fn from(hello: HelloMessage) -> PeerSyncInfo {
PeerSyncInfo { PeerSyncInfo {
@ -153,7 +106,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
/// ///
/// Sends a `Hello` message to the peer. /// Sends a `Hello` message to the peer.
pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) {
info!(self.log, "PeerConnect"; "peer" => format!("{:?}", peer_id)); info!(self.log, "PeerConnected"; "peer" => format!("{:?}", peer_id));
network.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain))); network.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain)));
} }
@ -193,51 +146,6 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
self.process_hello(peer_id, hello, network); self.process_hello(peer_id, hello, network);
} }
/// Returns a `PeerStatus` for some peer.
fn peer_status(&self, peer: PeerSyncInfo) -> PeerStatus {
let local = PeerSyncInfo::from(&self.chain);
if peer.has_different_network_id_to(local) {
return PeerStatus::DifferentNetworkId;
}
if local.has_higher_finalized_epoch_than(peer) {
let peer_finalized_slot = peer
.latest_finalized_epoch
.start_slot(T::EthSpec::spec().slots_per_epoch);
let local_roots = self.chain.get_block_roots(peer_finalized_slot, 1, 0);
if let Ok(local_roots) = local_roots {
if let Some(local_root) = local_roots.get(0) {
if *local_root != peer.latest_finalized_root {
return PeerStatus::FinalizedEpochNotInChain;
}
} else {
error!(
self.log,
"Cannot get root for peer finalized slot.";
"error" => "empty roots"
);
}
} else {
error!(
self.log,
"Cannot get root for peer finalized slot.";
"error" => format!("{:?}", local_roots)
);
}
}
if peer.has_higher_finalized_epoch_than(local) {
PeerStatus::HigherFinalizedEpoch
} else if peer.has_higher_best_slot_than(local) {
PeerStatus::HigherBestSlot
} else {
PeerStatus::NotInteresting
}
}
/// Process a `Hello` message, requesting new blocks if appropriate. /// Process a `Hello` message, requesting new blocks if appropriate.
/// ///
/// Disconnects the peer if required. /// Disconnects the peer if required.
@ -251,52 +159,68 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
let remote = PeerSyncInfo::from(hello); let remote = PeerSyncInfo::from(hello);
let local = PeerSyncInfo::from(&self.chain); let local = PeerSyncInfo::from(&self.chain);
let remote_status = self.peer_status(remote);
if remote_status.should_handshake() { let network_id_mismatch = local.network_id != remote.network_id;
info!(self.log, "HandshakeSuccess"; "peer" => format!("{:?}", peer_id)); let on_different_finalized_chain = (local.latest_finalized_epoch
self.known_peers.insert(peer_id.clone(), remote); >= remote.latest_finalized_epoch)
} else { && (!self
.chain
.rev_iter_block_roots(local.best_slot)
.any(|root| root == remote.latest_finalized_root));
if network_id_mismatch || on_different_finalized_chain {
info!( info!(
self.log, "HandshakeFailure"; self.log, "HandshakeFailure";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"reason" => "network_id" "reason" => "network_id"
); );
network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork); network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork);
return;
} else {
info!(self.log, "HandshakeSuccess"; "peer" => format!("{:?}", peer_id));
self.known_peers.insert(peer_id.clone(), remote);
} }
// If required, send additional requests. // If we have equal or better finalized epochs and best slots, we require nothing else from
match remote_status { // this peer.
PeerStatus::HigherFinalizedEpoch => { if (remote.latest_finalized_epoch <= local.latest_finalized_epoch)
let start_slot = remote && (remote.best_slot <= local.best_slot)
.latest_finalized_epoch {
.start_slot(spec.slots_per_epoch); return;
let required_slots = start_slot - local.best_slot; }
self.request_block_roots( // If the remote has a higher finalized epoch, request all block roots from our finalized
peer_id, // epoch through to its best slot.
BeaconBlockRootsRequest { if remote.latest_finalized_epoch > local.latest_finalized_epoch {
start_slot, let start_slot = local
count: required_slots.into(), .latest_finalized_epoch
}, .start_slot(spec.slots_per_epoch);
network, let required_slots = start_slot - remote.best_slot;
);
}
PeerStatus::HigherBestSlot => {
let required_slots = remote.best_slot - local.best_slot;
self.request_block_roots( self.request_block_roots(
peer_id, peer_id,
BeaconBlockRootsRequest { BeaconBlockRootsRequest {
start_slot: local.best_slot + 1, start_slot,
count: required_slots.into(), count: required_slots.into(),
}, },
network, network,
); );
} // If the remote has a greater best slot, request the roots between our best slot and their
PeerStatus::FinalizedEpochNotInChain => {} // best slot.
PeerStatus::DifferentNetworkId => {} } else if remote.best_root > local.best_root {
PeerStatus::NotInteresting => {} let start_slot = local
.latest_finalized_epoch
.start_slot(spec.slots_per_epoch);
let required_slots = start_slot - remote.best_slot;
self.request_block_roots(
peer_id,
BeaconBlockRootsRequest {
start_slot,
count: required_slots.into(),
},
network,
);
} }
} }
@ -315,27 +239,12 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"count" => req.count, "count" => req.count,
); );
let roots = match self let roots = self
.chain .chain
.get_block_roots(req.start_slot, req.count as usize, 0) .rev_iter_block_roots(req.start_slot)
{ .take(req.count as usize)
Ok(roots) => roots,
Err(e) => {
// TODO: return RPC error.
warn!(
self.log,
"RPCRequest"; "peer" => format!("{:?}", peer_id),
"req" => "BeaconBlockRoots",
"error" => format!("{:?}", e)
);
return;
}
};
let roots = roots
.iter()
.enumerate() .enumerate()
.map(|(i, &block_root)| BlockRootSlot { .map(|(i, block_root)| BlockRootSlot {
slot: req.start_slot + Slot::from(i), slot: req.start_slot + Slot::from(i),
block_root, block_root,
}) })
@ -426,24 +335,12 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"count" => req.max_headers, "count" => req.max_headers,
); );
let headers = match get_block_headers( let headers = get_block_headers(
&self.chain, &self.chain,
req.start_slot, req.start_slot,
req.max_headers as usize, req.max_headers as usize,
req.skip_slots as usize, req.skip_slots as usize,
) { );
Ok(headers) => headers,
Err(e) => {
// TODO: return RPC error.
warn!(
self.log,
"RPCRequest"; "peer" => format!("{:?}", peer_id),
"req" => "BeaconBlockHeaders",
"error" => format!("{:?}", e)
);
return;
}
};
network.send_rpc_response( network.send_rpc_response(
peer_id, peer_id,
@ -554,14 +451,15 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
) -> bool { ) -> bool {
info!( info!(
self.log, self.log,
"NewGossipBlock"; "GossipBlockReceived";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"block_slot" => format!("{:?}", block.slot),
); );
// Ignore any block from a finalized slot. // Ignore any block from a finalized slot.
if self.slot_is_finalized(block.slot) { if self.slot_is_finalized(block.slot) {
warn!( warn!(
self.log, "NewGossipBlock"; self.log, "IgnoredGossipBlock";
"msg" => "new block slot is finalized.", "msg" => "new block slot is finalized.",
"block_slot" => block.slot, "block_slot" => block.slot,
); );
@ -572,22 +470,15 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
// Ignore any block that the chain already knows about. // Ignore any block that the chain already knows about.
if self.chain_has_seen_block(&block_root) { if self.chain_has_seen_block(&block_root) {
println!("this happened");
// TODO: Age confirm that we shouldn't forward a block if we already know of it. // TODO: Age confirm that we shouldn't forward a block if we already know of it.
return false; return false;
} }
debug!(
self.log,
"NewGossipBlock";
"peer" => format!("{:?}", peer_id),
"msg" => "processing block",
);
match self.chain.process_block(block.clone()) { match self.chain.process_block(block.clone()) {
Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::ParentUnknown)) => { Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::ParentUnknown)) => {
// The block was valid and we processed it successfully. // The block was valid and we processed it successfully.
debug!( debug!(
self.log, "NewGossipBlock"; self.log, "InvalidGossipBlock";
"msg" => "parent block unknown", "msg" => "parent block unknown",
"parent_root" => format!("{}", block.previous_block_root), "parent_root" => format!("{}", block.previous_block_root),
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
@ -614,7 +505,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
if block_slot - present_slot > FUTURE_SLOT_TOLERANCE { if block_slot - present_slot > FUTURE_SLOT_TOLERANCE {
// The block is too far in the future, drop it. // The block is too far in the future, drop it.
warn!( warn!(
self.log, "NewGossipBlock"; self.log, "InvalidGossipBlock";
"msg" => "future block rejected", "msg" => "future block rejected",
"present_slot" => present_slot, "present_slot" => present_slot,
"block_slot" => block_slot, "block_slot" => block_slot,
@ -626,7 +517,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
} else { } else {
// The block is in the future, but not too far. // The block is in the future, but not too far.
warn!( warn!(
self.log, "NewGossipBlock"; self.log, "FutureGossipBlock";
"msg" => "queuing future block", "msg" => "queuing future block",
"present_slot" => present_slot, "present_slot" => present_slot,
"block_slot" => block_slot, "block_slot" => block_slot,
@ -643,8 +534,8 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
if outcome.is_invalid() { if outcome.is_invalid() {
// The peer has sent a block which is fundamentally invalid. // The peer has sent a block which is fundamentally invalid.
warn!( warn!(
self.log, "NewGossipBlock"; self.log, "InvalidGossipBlock";
"msg" => "invalid block from peer", "msg" => "peer sent objectively invalid block",
"outcome" => format!("{:?}", outcome), "outcome" => format!("{:?}", outcome),
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
); );
@ -655,8 +546,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
} else if outcome.sucessfully_processed() { } else if outcome.sucessfully_processed() {
// The block was valid and we processed it successfully. // The block was valid and we processed it successfully.
info!( info!(
self.log, "NewGossipBlock"; self.log, "ValidGossipBlock";
"msg" => "block import successful",
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
); );
// Forward the block to peers // Forward the block to peers
@ -665,7 +555,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
// The block wasn't necessarily invalid but we didn't process it successfully. // The block wasn't necessarily invalid but we didn't process it successfully.
// This condition shouldn't be reached. // This condition shouldn't be reached.
error!( error!(
self.log, "NewGossipBlock"; self.log, "InvalidGossipBlock";
"msg" => "unexpected condition in processing block.", "msg" => "unexpected condition in processing block.",
"outcome" => format!("{:?}", outcome), "outcome" => format!("{:?}", outcome),
); );
@ -679,7 +569,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
// Blocks should not be able to trigger errors, instead they should be flagged as // Blocks should not be able to trigger errors, instead they should be flagged as
// invalid. // invalid.
error!( error!(
self.log, "NewGossipBlock"; self.log, "InvalidGossipBlock";
"msg" => "internal error in processing block.", "msg" => "internal error in processing block.",
"error" => format!("{:?}", e), "error" => format!("{:?}", e),
); );
@ -870,7 +760,11 @@ fn get_block_headers<T: BeaconChainTypes>(
start_slot: Slot, start_slot: Slot,
count: usize, count: usize,
skip: usize, skip: usize,
) -> Result<Vec<BeaconBlockHeader>, BeaconChainError> { ) -> Vec<BeaconBlockHeader> {
let roots = beacon_chain.get_block_roots(start_slot, count, skip)?; beacon_chain
beacon_chain.get_block_headers(&roots) .rev_iter_blocks(start_slot)
.step_by(skip + 1)
.take(count)
.map(|block| block.block_header())
.collect()
} }