Fix recently introduced sync bugs

This commit is contained in:
Paul Hauner 2019-06-07 02:55:16 -04:00
parent 4cecf05198
commit 719dd72de6
No known key found for this signature in database
GPG Key ID: 303E4494BB28068C
5 changed files with 179 additions and 130 deletions

View File

@ -9,6 +9,7 @@ sloggers = "0.3.2"
[dependencies] [dependencies]
beacon_chain = { path = "../beacon_chain" } beacon_chain = { path = "../beacon_chain" }
store = { path = "../store" }
eth2-libp2p = { path = "../eth2-libp2p" } eth2-libp2p = { path = "../eth2-libp2p" }
version = { path = "../version" } version = { path = "../version" }
types = { path = "../../eth2/types" } types = { path = "../../eth2/types" }

View File

@ -166,7 +166,7 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
let mut required_bodies: Vec<Hash256> = vec![]; let mut required_bodies: Vec<Hash256> = vec![];
for header in headers { for header in headers {
let block_root = Hash256::from_slice(&header.tree_hash_root()[..]); let block_root = Hash256::from_slice(&header.canonical_root()[..]);
if self.chain_has_not_seen_block(&block_root) { if self.chain_has_not_seen_block(&block_root) {
self.insert_header(block_root, header, sender.clone()); self.insert_header(block_root, header, sender.clone());
@ -250,7 +250,7 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
/// ///
/// If the partial already existed, the `inserted` time is set to `now`. /// If the partial already existed, the `inserted` time is set to `now`.
fn insert_full_block(&mut self, block: BeaconBlock, sender: PeerId) { fn insert_full_block(&mut self, block: BeaconBlock, sender: PeerId) {
let block_root = Hash256::from_slice(&block.tree_hash_root()[..]); let block_root = Hash256::from_slice(&block.canonical_root()[..]);
let partial = PartialBeaconBlock { let partial = PartialBeaconBlock {
slot: block.slot, slot: block.slot,

View File

@ -8,8 +8,11 @@ use slog::{debug, error, info, o, warn};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use store::Store;
use tree_hash::TreeHash; use tree_hash::TreeHash;
use types::{Attestation, BeaconBlock, BeaconBlockHeader, Epoch, EthSpec, Hash256, Slot}; use types::{
Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, EthSpec, Hash256, Slot,
};
/// The number of slots that we can import blocks ahead of us, before going into full Sync mode. /// The number of slots that we can import blocks ahead of us, before going into full Sync mode.
const SLOT_IMPORT_TOLERANCE: u64 = 100; const SLOT_IMPORT_TOLERANCE: u64 = 100;
@ -160,42 +163,59 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
let remote = PeerSyncInfo::from(hello); let remote = PeerSyncInfo::from(hello);
let local = PeerSyncInfo::from(&self.chain); let local = PeerSyncInfo::from(&self.chain);
let network_id_mismatch = local.network_id != remote.network_id; // Disconnect nodes who are on a different network.
let on_different_finalized_chain = (local.latest_finalized_epoch if local.network_id != remote.network_id {
>= remote.latest_finalized_epoch)
&& (!self
.chain
.rev_iter_block_roots(local.best_slot)
.any(|root| root == remote.latest_finalized_root));
if network_id_mismatch || on_different_finalized_chain {
info!( info!(
self.log, "HandshakeFailure"; self.log, "HandshakeFailure";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"reason" => "network_id" "reason" => "network_id"
); );
network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork); network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork);
return; // Disconnect nodes if our finalized epoch is greater than thieirs, and their finalized
// epoch is not in our chain. Viz., they are on another chain.
//
// If the local or remote have a `latest_finalized_root == ZERO_HASH`, skips checks about
// the finalized_root. The logic is akward and I think we're better without it.
} else if (local.latest_finalized_epoch >= remote.latest_finalized_epoch)
&& (!self
.chain
.rev_iter_block_roots(local.best_slot)
.any(|root| root == remote.latest_finalized_root))
&& (local.latest_finalized_root != spec.zero_hash)
&& (remote.latest_finalized_root != spec.zero_hash)
{
info!(
self.log, "HandshakeFailure";
"peer" => format!("{:?}", peer_id),
"reason" => "wrong_finalized_chain"
);
network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork);
// Process handshakes from peers that seem to be on our chain.
} else { } else {
info!(self.log, "HandshakeSuccess"; "peer" => format!("{:?}", peer_id)); info!(self.log, "HandshakeSuccess"; "peer" => format!("{:?}", peer_id));
self.known_peers.insert(peer_id.clone(), remote); self.known_peers.insert(peer_id.clone(), remote);
}
// If we have equal or better finalized epochs and best slots, we require nothing else from // If we have equal or better finalized epochs and best slots, we require nothing else from
// this peer. // this peer.
//
// We make an exception when our best slot is 0. Best slot does not indicate wether or
// not there is a block at slot zero.
if (remote.latest_finalized_epoch <= local.latest_finalized_epoch) if (remote.latest_finalized_epoch <= local.latest_finalized_epoch)
&& (remote.best_slot <= local.best_slot) && (remote.best_slot <= local.best_slot)
&& (local.best_slot > 0)
{ {
debug!(self.log, "Peer is naive"; "peer" => format!("{:?}", peer_id));
return; return;
} }
// If the remote has a higher finalized epoch, request all block roots from our finalized // If the remote has a higher finalized epoch, request all block roots from our finalized
// epoch through to its best slot. // epoch through to its best slot.
if remote.latest_finalized_epoch > local.latest_finalized_epoch { if remote.latest_finalized_epoch > local.latest_finalized_epoch {
debug!(self.log, "Peer has high finalized epoch"; "peer" => format!("{:?}", peer_id));
let start_slot = local let start_slot = local
.latest_finalized_epoch .latest_finalized_epoch
.start_slot(spec.slots_per_epoch); .start_slot(spec.slots_per_epoch);
let required_slots = start_slot - remote.best_slot; let required_slots = remote.best_slot - start_slot;
self.request_block_roots( self.request_block_roots(
peer_id, peer_id,
@ -207,11 +227,12 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
); );
// If the remote has a greater best slot, request the roots between our best slot and their // If the remote has a greater best slot, request the roots between our best slot and their
// best slot. // best slot.
} else if remote.best_root > local.best_root { } else if remote.best_slot > local.best_slot {
debug!(self.log, "Peer has higher best slot"; "peer" => format!("{:?}", peer_id));
let start_slot = local let start_slot = local
.latest_finalized_epoch .latest_finalized_epoch
.start_slot(spec.slots_per_epoch); .start_slot(spec.slots_per_epoch);
let required_slots = start_slot - remote.best_slot; let required_slots = remote.best_slot - start_slot;
self.request_block_roots( self.request_block_roots(
peer_id, peer_id,
@ -221,6 +242,9 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
}, },
network, network,
); );
} else {
debug!(self.log, "Nothing to request from peer"; "peer" => format!("{:?}", peer_id));
}
} }
} }
@ -237,19 +261,40 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"BlockRootsRequest"; "BlockRootsRequest";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"count" => req.count, "count" => req.count,
"start_slot" => req.start_slot,
); );
let roots = self let mut roots: Vec<Hash256> = self
.chain .chain
.rev_iter_block_roots(req.start_slot) .rev_iter_block_roots(req.start_slot + req.count)
.skip(1)
.take(req.count as usize) .take(req.count as usize)
.collect();
if roots.len() as u64 != req.count {
debug!(
self.log,
"BlockRootsRequest";
"peer" => format!("{:?}", peer_id),
"msg" => "Failed to return all requested hashes",
"requested" => req.count,
"returned" => roots.len(),
);
}
roots.reverse();
let mut roots: Vec<BlockRootSlot> = roots
.iter()
.enumerate() .enumerate()
.map(|(i, block_root)| BlockRootSlot { .map(|(i, block_root)| BlockRootSlot {
slot: req.start_slot + Slot::from(i), slot: req.start_slot + Slot::from(i),
block_root, block_root: *block_root,
}) })
.collect(); .collect();
roots.dedup_by_key(|brs| brs.block_root);
network.send_rpc_response( network.send_rpc_response(
peer_id, peer_id,
request_id, request_id,
@ -335,12 +380,28 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"count" => req.max_headers, "count" => req.max_headers,
); );
let headers = get_block_headers( let count = req.max_headers;
&self.chain,
req.start_slot, // Collect the block roots.
req.max_headers as usize, //
req.skip_slots as usize, // Instead of using `chain.rev_iter_blocks` we collect the roots first. This avoids
); // unnecessary block deserialization when `req.skip_slots > 0`.
let mut roots: Vec<Hash256> = self
.chain
.rev_iter_block_roots(req.start_slot + (count - 1))
.take(count as usize)
.collect();
roots.reverse();
let headers: Vec<BeaconBlockHeader> = roots
.into_iter()
.step_by(req.skip_slots as usize + 1)
.filter_map(|root| {
let block = self.chain.store.get::<BeaconBlock>(&root).ok()?;
Some(block?.block_header())
})
.collect();
network.send_rpc_response( network.send_rpc_response(
peer_id, peer_id,
@ -388,27 +449,33 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
req: BeaconBlockBodiesRequest, req: BeaconBlockBodiesRequest,
network: &mut NetworkContext, network: &mut NetworkContext,
) { ) {
let block_bodies: Vec<BeaconBlockBody> = req
.block_roots
.iter()
.filter_map(|root| {
if let Ok(Some(block)) = self.chain.store.get::<BeaconBlock>(root) {
Some(block.body)
} else {
debug!(
self.log,
"Peer requested unknown block";
"peer" => format!("{:?}", peer_id),
"request_root" => format!("{:}", root),
);
None
}
})
.collect();
debug!( debug!(
self.log, self.log,
"BlockBodiesRequest"; "BlockBodiesRequest";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"count" => req.block_roots.len(), "requested" => req.block_roots.len(),
"returned" => block_bodies.len(),
); );
let block_bodies = match self.chain.get_block_bodies(&req.block_roots) {
Ok(bodies) => bodies,
Err(e) => {
// TODO: return RPC error.
warn!(
self.log,
"RPCRequest"; "peer" => format!("{:?}", peer_id),
"req" => "BeaconBlockBodies",
"error" => format!("{:?}", e)
);
return;
}
};
network.send_rpc_response( network.send_rpc_response(
peer_id, peer_id,
request_id, request_id,
@ -449,18 +516,12 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
block: BeaconBlock, block: BeaconBlock,
network: &mut NetworkContext, network: &mut NetworkContext,
) -> bool { ) -> bool {
info!(
self.log,
"GossipBlockReceived";
"peer" => format!("{:?}", peer_id),
"block_slot" => format!("{:?}", block.slot),
);
// Ignore any block from a finalized slot. // Ignore any block from a finalized slot.
if self.slot_is_finalized(block.slot) { if self.slot_is_finalized(block.slot) {
warn!( debug!(
self.log, "IgnoredGossipBlock"; self.log, "IgnoredFinalizedBlock";
"msg" => "new block slot is finalized.", "source" => "gossip",
"msg" => "chain is finalized at block slot",
"block_slot" => block.slot, "block_slot" => block.slot,
); );
return false; return false;
@ -475,11 +536,11 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
} }
match self.chain.process_block(block.clone()) { match self.chain.process_block(block.clone()) {
Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::ParentUnknown)) => { Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::ParentUnknown { .. })) => {
// The block was valid and we processed it successfully. // The block was valid and we processed it successfully.
debug!( debug!(
self.log, "InvalidGossipBlock"; self.log, "ParentBlockUnknown";
"msg" => "parent block unknown", "source" => "gossip",
"parent_root" => format!("{}", block.previous_block_root), "parent_root" => format!("{}", block.previous_block_root),
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
); );
@ -505,8 +566,9 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
if block_slot - present_slot > FUTURE_SLOT_TOLERANCE { if block_slot - present_slot > FUTURE_SLOT_TOLERANCE {
// The block is too far in the future, drop it. // The block is too far in the future, drop it.
warn!( warn!(
self.log, "InvalidGossipBlock"; self.log, "FutureBlock";
"msg" => "future block rejected", "source" => "gossip",
"msg" => "block for future slot rejected, check your time",
"present_slot" => present_slot, "present_slot" => present_slot,
"block_slot" => block_slot, "block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
@ -517,8 +579,9 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
} else { } else {
// The block is in the future, but not too far. // The block is in the future, but not too far.
warn!( warn!(
self.log, "FutureGossipBlock"; self.log, "QueuedFutureBlock";
"msg" => "queuing future block", "source" => "gossip",
"msg" => "queuing future block, check your time",
"present_slot" => present_slot, "present_slot" => present_slot,
"block_slot" => block_slot, "block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
@ -534,7 +597,8 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
if outcome.is_invalid() { if outcome.is_invalid() {
// The peer has sent a block which is fundamentally invalid. // The peer has sent a block which is fundamentally invalid.
warn!( warn!(
self.log, "InvalidGossipBlock"; self.log, "InvalidBlock";
"source" => "gossip",
"msg" => "peer sent objectively invalid block", "msg" => "peer sent objectively invalid block",
"outcome" => format!("{:?}", outcome), "outcome" => format!("{:?}", outcome),
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
@ -546,7 +610,8 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
} else if outcome.sucessfully_processed() { } else if outcome.sucessfully_processed() {
// The block was valid and we processed it successfully. // The block was valid and we processed it successfully.
info!( info!(
self.log, "ValidGossipBlock"; self.log, "ImportedBlock";
"source" => "gossip",
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
); );
// Forward the block to peers // Forward the block to peers
@ -555,7 +620,8 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
// The block wasn't necessarily invalid but we didn't process it successfully. // The block wasn't necessarily invalid but we didn't process it successfully.
// This condition shouldn't be reached. // This condition shouldn't be reached.
error!( error!(
self.log, "InvalidGossipBlock"; self.log, "BlockProcessingFailure";
"source" => "gossip",
"msg" => "unexpected condition in processing block.", "msg" => "unexpected condition in processing block.",
"outcome" => format!("{:?}", outcome), "outcome" => format!("{:?}", outcome),
); );
@ -569,8 +635,9 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
// Blocks should not be able to trigger errors, instead they should be flagged as // Blocks should not be able to trigger errors, instead they should be flagged as
// invalid. // invalid.
error!( error!(
self.log, "InvalidGossipBlock"; self.log, "BlockProcessingError";
"msg" => "internal error in processing block.", "msg" => "internal error in processing block.",
"source" => "gossip",
"error" => format!("{:?}", e), "error" => format!("{:?}", e),
); );
// Do not forward the block to peers. // Do not forward the block to peers.
@ -584,19 +651,15 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
/// Not currently implemented. /// Not currently implemented.
pub fn on_attestation_gossip( pub fn on_attestation_gossip(
&mut self, &mut self,
peer_id: PeerId, _peer_id: PeerId,
msg: Attestation, msg: Attestation,
_network: &mut NetworkContext, _network: &mut NetworkContext,
) { ) {
info!(
self.log,
"NewAttestationGossip";
"peer" => format!("{:?}", peer_id),
);
match self.chain.process_attestation(msg) { match self.chain.process_attestation(msg) {
Ok(()) => info!(self.log, "ImportedAttestation"), Ok(()) => info!(self.log, "ImportedAttestation"; "source" => "gossip"),
Err(e) => warn!(self.log, "InvalidAttestation"; "error" => format!("{:?}", e)), Err(e) => {
warn!(self.log, "InvalidAttestation"; "source" => "gossip", "error" => format!("{:?}", e))
}
} }
} }
@ -611,6 +674,9 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
// Loop through all of the complete blocks in the queue. // Loop through all of the complete blocks in the queue.
for (block_root, block, sender) in self.import_queue.complete_blocks() { for (block_root, block, sender) in self.import_queue.complete_blocks() {
let slot = block.slot;
let parent_root = block.previous_block_root;
match self.chain.process_block(block) { match self.chain.process_block(block) {
Ok(outcome) => { Ok(outcome) => {
if outcome.is_invalid() { if outcome.is_invalid() {
@ -618,15 +684,12 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
warn!( warn!(
self.log, self.log,
"InvalidBlock"; "InvalidBlock";
"sender_peer_id" => format!("{:?}", sender), "sender_peer_id" => format!("{:?}", sender.clone()),
"block_root" => format!("{}", block_root),
"reason" => format!("{:?}", outcome), "reason" => format!("{:?}", outcome),
); );
network.disconnect(sender, GoodbyeReason::Fault); network.disconnect(sender, GoodbyeReason::Fault);
break; } else if outcome.sucessfully_processed() {
}
// If this results to true, the item will be removed from the queue.
if outcome.sucessfully_processed() {
successful += 1; successful += 1;
self.import_queue.remove(block_root); self.import_queue.remove(block_root);
} else { } else {
@ -635,6 +698,8 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"ProcessImportQueue"; "ProcessImportQueue";
"msg" => "Block not imported", "msg" => "Block not imported",
"outcome" => format!("{:?}", outcome), "outcome" => format!("{:?}", outcome),
"block_slot" => format!("{:?}", slot),
"parent_root" => format!("{}", parent_root),
"peer" => format!("{:?}", sender), "peer" => format!("{:?}", sender),
); );
} }
@ -752,19 +817,3 @@ fn hello_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) -> HelloMes
best_slot: state.slot, best_slot: state.slot,
} }
} }
/// Return a list of `BeaconBlockHeader` from the given `BeaconChain`, starting at `start_slot` and
/// returning `count` headers with a gap of `skip` slots between each.
fn get_block_headers<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>,
start_slot: Slot,
count: usize,
skip: usize,
) -> Vec<BeaconBlockHeader> {
beacon_chain
.rev_iter_blocks(start_slot)
.step_by(skip + 1)
.take(count)
.map(|block| block.block_header())
.collect()
}

View File

@ -1,5 +1,4 @@
use crate::*; use crate::*;
use tree_hash::SignedRoot;
use types::*; use types::*;
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -44,7 +43,7 @@ fn cache_state<T: EthSpec>(state: &mut BeaconState<T>, spec: &ChainSpec) -> Resu
// Store the previous slot's post state transition root. // Store the previous slot's post state transition root.
state.set_state_root(previous_slot, previous_slot_state_root)?; state.set_state_root(previous_slot, previous_slot_state_root)?;
let latest_block_root = Hash256::from_slice(&state.latest_block_header.signed_root()[..]); let latest_block_root = state.latest_block_header.canonical_root();
state.set_block_root(previous_slot, latest_block_root)?; state.set_block_root(previous_slot, latest_block_root)?;
// Set the state slot back to what it should be. // Set the state slot back to what it should be.

View File

@ -111,7 +111,7 @@ where
pub current_crosslinks: FixedLenVec<Crosslink, T::ShardCount>, pub current_crosslinks: FixedLenVec<Crosslink, T::ShardCount>,
pub previous_crosslinks: FixedLenVec<Crosslink, T::ShardCount>, pub previous_crosslinks: FixedLenVec<Crosslink, T::ShardCount>,
pub latest_block_roots: FixedLenVec<Hash256, T::SlotsPerHistoricalRoot>, pub latest_block_roots: FixedLenVec<Hash256, T::SlotsPerHistoricalRoot>,
latest_state_roots: FixedLenVec<Hash256, T::SlotsPerHistoricalRoot>, pub latest_state_roots: FixedLenVec<Hash256, T::SlotsPerHistoricalRoot>,
latest_active_index_roots: FixedLenVec<Hash256, T::LatestActiveIndexRootsLength>, latest_active_index_roots: FixedLenVec<Hash256, T::LatestActiveIndexRootsLength>,
latest_slashed_balances: FixedLenVec<u64, T::LatestSlashedExitLength>, latest_slashed_balances: FixedLenVec<u64, T::LatestSlashedExitLength>,
pub latest_block_header: BeaconBlockHeader, pub latest_block_header: BeaconBlockHeader,