Partially refactor simple_sync, makes improvement

This commit is contained in:
Paul Hauner 2019-06-27 18:05:03 +10:00
parent 906580be15
commit 2a7122beaf
No known key found for this signature in database
GPG Key ID: 5E2CFF9B75FA63DF
3 changed files with 143 additions and 195 deletions

View File

@ -1,7 +1,8 @@
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::PeerId; use eth2_libp2p::PeerId;
use slog::{debug, error}; use slog::error;
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tree_hash::TreeHash; use tree_hash::TreeHash;
@ -22,7 +23,7 @@ use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256, Slot};
pub struct ImportQueue<T: BeaconChainTypes> { pub struct ImportQueue<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>, pub chain: Arc<BeaconChain<T>>,
/// Partially imported blocks, keyed by the root of `BeaconBlockBody`. /// Partially imported blocks, keyed by the root of `BeaconBlockBody`.
pub partials: Vec<PartialBeaconBlock>, partials: HashMap<Hash256, PartialBeaconBlock>,
/// Time before a queue entry is considered state. /// Time before a queue entry is considered state.
pub stale_time: Duration, pub stale_time: Duration,
/// Logging /// Logging
@ -34,7 +35,7 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
pub fn new(chain: Arc<BeaconChain<T>>, stale_time: Duration, log: slog::Logger) -> Self { pub fn new(chain: Arc<BeaconChain<T>>, stale_time: Duration, log: slog::Logger) -> Self {
Self { Self {
chain, chain,
partials: vec![], partials: HashMap::new(),
stale_time, stale_time,
log, log,
} }
@ -52,7 +53,7 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
let mut complete: Vec<(Hash256, BeaconBlock, PeerId)> = self let mut complete: Vec<(Hash256, BeaconBlock, PeerId)> = self
.partials .partials
.iter() .iter()
.filter_map(|partial| partial.clone().complete()) .filter_map(|(_, partial)| partial.clone().complete())
.collect(); .collect();
// Sort the completable partials to be in ascending slot order. // Sort the completable partials to be in ascending slot order.
@ -61,14 +62,14 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
complete complete
} }
pub fn contains_block_root(&self, block_root: Hash256) -> bool {
self.partials.contains_key(&block_root)
}
/// Removes the first `PartialBeaconBlock` with a matching `block_root`, returning the partial /// Removes the first `PartialBeaconBlock` with a matching `block_root`, returning the partial
/// if it exists. /// if it exists.
pub fn remove(&mut self, block_root: Hash256) -> Option<PartialBeaconBlock> { pub fn remove(&mut self, block_root: Hash256) -> Option<PartialBeaconBlock> {
let position = self self.partials.remove(&block_root)
.partials
.iter()
.position(|p| p.block_root == block_root)?;
Some(self.partials.remove(position))
} }
/// Flushes all stale entries from the queue. /// Flushes all stale entries from the queue.
@ -76,31 +77,10 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
/// An entry is stale if it has as a `inserted` time that is more than `self.stale_time` in the /// An entry is stale if it has as a `inserted` time that is more than `self.stale_time` in the
/// past. /// past.
pub fn remove_stale(&mut self) { pub fn remove_stale(&mut self) {
let stale_indices: Vec<usize> = self let stale_time = self.stale_time;
.partials
.iter()
.enumerate()
.filter_map(|(i, partial)| {
if partial.inserted + self.stale_time <= Instant::now() {
Some(i)
} else {
None
}
})
.collect();
if !stale_indices.is_empty() { self.partials
debug!( .retain(|_, partial| partial.inserted + stale_time > Instant::now())
self.log,
"ImportQueue removing stale entries";
"stale_items" => stale_indices.len(),
"stale_time_seconds" => self.stale_time.as_secs()
);
}
stale_indices.iter().for_each(|&i| {
self.partials.remove(i);
});
} }
/// Returns `true` if `self.chain` has not yet processed this block. /// Returns `true` if `self.chain` has not yet processed this block.
@ -122,27 +102,30 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
block_roots: &[BlockRootSlot], block_roots: &[BlockRootSlot],
sender: PeerId, sender: PeerId,
) -> Vec<BlockRootSlot> { ) -> Vec<BlockRootSlot> {
let new_roots: Vec<BlockRootSlot> = block_roots let new_block_root_slots: Vec<BlockRootSlot> = block_roots
.iter() .iter()
// Ignore any roots already stored in the queue.
.filter(|brs| !self.contains_block_root(brs.block_root))
// Ignore any roots already processed by the chain. // Ignore any roots already processed by the chain.
.filter(|brs| self.chain_has_not_seen_block(&brs.block_root)) .filter(|brs| self.chain_has_not_seen_block(&brs.block_root))
// Ignore any roots already stored in the queue.
.filter(|brs| !self.partials.iter().any(|p| p.block_root == brs.block_root))
.cloned() .cloned()
.collect(); .collect();
new_roots.iter().for_each(|brs| { self.partials.extend(
self.partials.push(PartialBeaconBlock { new_block_root_slots
slot: brs.slot, .iter()
block_root: brs.block_root, .map(|brs| PartialBeaconBlock {
sender: sender.clone(), slot: brs.slot,
header: None, block_root: brs.block_root,
body: None, sender: sender.clone(),
inserted: Instant::now(), header: None,
}) body: None,
}); inserted: Instant::now(),
})
.map(|partial| (partial.block_root, partial)),
);
new_roots new_block_root_slots
} }
/// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for /// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for
@ -170,7 +153,7 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
if self.chain_has_not_seen_block(&block_root) { if self.chain_has_not_seen_block(&block_root) {
self.insert_header(block_root, header, sender.clone()); self.insert_header(block_root, header, sender.clone());
required_bodies.push(block_root) required_bodies.push(block_root);
} }
} }
@ -197,31 +180,20 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
/// If the header already exists, the `inserted` time is set to `now` and not other /// If the header already exists, the `inserted` time is set to `now` and not other
/// modifications are made. /// modifications are made.
fn insert_header(&mut self, block_root: Hash256, header: BeaconBlockHeader, sender: PeerId) { fn insert_header(&mut self, block_root: Hash256, header: BeaconBlockHeader, sender: PeerId) {
if let Some(i) = self self.partials
.partials .entry(block_root)
.iter() .and_modify(|partial| {
.position(|p| p.block_root == block_root) partial.header = Some(header.clone());
{ partial.inserted = Instant::now();
// Case 1: there already exists a partial with a matching block root. })
// .or_insert_with(|| PartialBeaconBlock {
// The `inserted` time is set to now and the header is replaced, regardless of whether
// it existed or not.
self.partials[i].header = Some(header);
self.partials[i].inserted = Instant::now();
} else {
// Case 2: there was no partial with a matching block root.
//
// A new partial is added. This case permits adding a header without already known the
// root.
self.partials.push(PartialBeaconBlock {
slot: header.slot, slot: header.slot,
block_root, block_root,
header: Some(header), header: Some(header),
body: None, body: None,
inserted: Instant::now(), inserted: Instant::now(),
sender, sender,
}) });
}
} }
/// Updates an existing partial with the `body`. /// Updates an existing partial with the `body`.
@ -232,7 +204,7 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) { fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) {
let body_root = Hash256::from_slice(&body.tree_hash_root()[..]); let body_root = Hash256::from_slice(&body.tree_hash_root()[..]);
self.partials.iter_mut().for_each(|mut p| { self.partials.iter_mut().for_each(|(_, mut p)| {
if let Some(header) = &mut p.header { if let Some(header) = &mut p.header {
if body_root == header.block_body_root { if body_root == header.block_body_root {
p.inserted = Instant::now(); p.inserted = Instant::now();
@ -261,15 +233,10 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
sender, sender,
}; };
if let Some(i) = self self.partials
.partials .entry(block_root)
.iter() .and_modify(|existing_partial| *existing_partial = partial.clone())
.position(|p| p.block_root == block_root) .or_insert(partial);
{
self.partials[i] = partial;
} else {
self.partials.push(partial)
}
} }
} }

View File

@ -17,7 +17,7 @@ use types::{
const SLOT_IMPORT_TOLERANCE: u64 = 100; const SLOT_IMPORT_TOLERANCE: u64 = 100;
/// The amount of seconds a block (or partial block) may exist in the import queue. /// The amount of seconds a block (or partial block) may exist in the import queue.
const QUEUE_STALE_SECS: u64 = 600; const QUEUE_STALE_SECS: u64 = 6;
/// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it.
/// Otherwise we queue it. /// Otherwise we queue it.
@ -72,7 +72,6 @@ pub struct SimpleSync<T: BeaconChainTypes> {
import_queue: ImportQueue<T>, import_queue: ImportQueue<T>,
/// The current state of the syncing protocol. /// The current state of the syncing protocol.
state: SyncState, state: SyncState,
/// Sync logger.
log: slog::Logger, log: slog::Logger,
} }
@ -160,119 +159,89 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
hello: HelloMessage, hello: HelloMessage,
network: &mut NetworkContext, network: &mut NetworkContext,
) { ) {
let spec = &self.chain.spec;
let remote = PeerSyncInfo::from(hello); let remote = PeerSyncInfo::from(hello);
let local = PeerSyncInfo::from(&self.chain); let local = PeerSyncInfo::from(&self.chain);
let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch());
// Disconnect nodes who are on a different network.
if local.network_id != remote.network_id { if local.network_id != remote.network_id {
// The node is on a different network, disconnect them.
info!( info!(
self.log, "HandshakeFailure"; self.log, "HandshakeFailure";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"reason" => "network_id" "reason" => "network_id"
); );
network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork); network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork);
// Disconnect nodes if our finalized epoch is greater than theirs, and their finalized } else if remote.latest_finalized_epoch <= local.latest_finalized_epoch
// epoch is not in our chain. Viz., they are on another chain. && remote.latest_finalized_root != self.chain.spec.zero_hash
// && local.latest_finalized_root != self.chain.spec.zero_hash
// If the local or remote have a `latest_finalized_root == ZERO_HASH`, skips checks about && (self.root_at_slot(start_slot(remote.latest_finalized_epoch))
// the finalized_root. The logic is awkward and I think we're better without it. != Some(remote.latest_finalized_root))
} else if (local.latest_finalized_epoch >= remote.latest_finalized_epoch)
&& self.root_at_slot(start_slot(remote.latest_finalized_epoch))
!= Some(remote.latest_finalized_root)
&& (local.latest_finalized_root != spec.zero_hash)
&& (remote.latest_finalized_root != spec.zero_hash)
{ {
// The remotes finalized epoch is less than or greater than ours, but the block root is
// different to the one in our chain.
//
// Therefore, the node is on a different chain and we should not communicate with them.
info!( info!(
self.log, "HandshakeFailure"; self.log, "HandshakeFailure";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"reason" => "wrong_finalized_chain" "reason" => "different finalized chain"
); );
network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork); network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork);
// Process handshakes from peers that seem to be on our chain. } else if remote.latest_finalized_epoch < local.latest_finalized_epoch {
// The node has a lower finalized epoch, their chain is not useful to us. There are two
// cases where a node can have a lower finalized epoch:
//
// ## The node is on the same chain
//
// If a node is on the same chain but has a lower finalized epoch, their head must be
// lower than ours. Therefore, we have nothing to request from them.
//
// ## The node is on a fork
//
// If a node is on a fork that has a lower finalized epoch, switching to that fork would
// cause us to revert a finalized block. This is not permitted, therefore we have no
// interest in their blocks.
debug!(
self.log,
"NaivePeer";
"peer" => format!("{:?}", peer_id),
"reason" => "lower finalized epoch"
);
} else if self
.chain
.store
.exists::<BeaconBlock>(&remote.best_root)
.unwrap_or_else(|_| false)
{
// If the node's best-block is already known to us, we have nothing to request.
debug!(
self.log,
"NaivePeer";
"peer" => format!("{:?}", peer_id),
"reason" => "best block is known"
);
} else { } else {
info!(self.log, "HandshakeSuccess"; "peer" => format!("{:?}", peer_id)); // The remote node has an equal or great finalized epoch and we don't know it's head.
self.known_peers.insert(peer_id.clone(), remote);
let remote_best_root_is_in_chain =
self.root_at_slot(remote.best_slot) == Some(local.best_root);
// We require nothing from this peer if:
// //
// - Their finalized epoch is less than ours // Therefore, there are some blocks between the local finalized epoch and the remote
// - Their finalized root is in our chain (established earlier) // head that are worth downloading.
// - Their best slot is less than ours debug!(self.log, "UsefulPeer"; "peer" => format!("{:?}", peer_id));
// - Their best root is in our chain.
//
// We make an exception when our best slot is 0. Best slot does not indicate Wether or
// not there is a block at slot zero.
if (remote.latest_finalized_epoch <= local.latest_finalized_epoch)
&& (remote.best_slot <= local.best_slot)
&& (local.best_slot > 0)
&& remote_best_root_is_in_chain
{
debug!(self.log, "Peer is naive"; "peer" => format!("{:?}", peer_id));
return;
}
// If the remote has a higher finalized epoch, request all block roots from our finalized let start_slot = local
// epoch through to its best slot. .latest_finalized_epoch
if remote.latest_finalized_epoch > local.latest_finalized_epoch { .start_slot(T::EthSpec::slots_per_epoch());
debug!(self.log, "Peer has high finalized epoch"; "peer" => format!("{:?}", peer_id)); let required_slots = remote.best_slot - start_slot;
let start_slot = local
.latest_finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
let required_slots = remote.best_slot - start_slot;
self.request_block_roots( self.request_block_roots(
peer_id, peer_id,
BeaconBlockRootsRequest { BeaconBlockRootsRequest {
start_slot, start_slot,
count: required_slots.into(), count: required_slots.into(),
}, },
network, network,
); );
// If the remote has a greater best slot, request the roots between our best slot and their
// best slot.
} else if remote.best_slot > local.best_slot {
debug!(self.log, "Peer has higher best slot"; "peer" => format!("{:?}", peer_id));
let start_slot = local
.latest_finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
let required_slots = remote.best_slot - start_slot;
self.request_block_roots(
peer_id,
BeaconBlockRootsRequest {
start_slot,
count: required_slots.into(),
},
network,
);
// The remote has a lower best slot, but the root for that slot is not in our chain.
//
// This means the remote is on another chain.
} else if remote.best_slot <= local.best_slot && !remote_best_root_is_in_chain {
debug!(self.log, "Peer has a best slot on a different chain"; "peer" => format!("{:?}", peer_id));
let start_slot = local
.latest_finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
let required_slots = remote.best_slot - start_slot;
self.request_block_roots(
peer_id,
BeaconBlockRootsRequest {
start_slot,
count: required_slots.into(),
},
network,
);
} else {
warn!(self.log, "Unexpected condition in syncing"; "peer" => format!("{:?}", peer_id));
}
} }
} }
@ -309,11 +278,13 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
.collect(); .collect();
if roots.len() as u64 != req.count { if roots.len() as u64 != req.count {
debug!( warn!(
self.log, self.log,
"BlockRootsRequest"; "BlockRootsRequest";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"msg" => "Failed to return all requested hashes", "msg" => "Failed to return all requested hashes",
"start_slot" => req.start_slot,
"current_slot" => self.chain.current_state().slot,
"requested" => req.count, "requested" => req.count,
"returned" => roots.len(), "returned" => roots.len(),
); );
@ -385,7 +356,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
BeaconBlockHeadersRequest { BeaconBlockHeadersRequest {
start_root: first.block_root, start_root: first.block_root,
start_slot: first.slot, start_slot: first.slot,
max_headers: (last.slot - first.slot + 1).as_u64(), max_headers: (last.slot - first.slot).as_u64(),
skip_slots: 0, skip_slots: 0,
}, },
network, network,
@ -467,7 +438,9 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
.import_queue .import_queue
.enqueue_headers(res.headers, peer_id.clone()); .enqueue_headers(res.headers, peer_id.clone());
self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); if !block_roots.is_empty() {
self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network);
}
} }
/// Handle a `BeaconBlockBodies` request from the peer. /// Handle a `BeaconBlockBodies` request from the peer.
@ -552,10 +525,28 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
{ {
match outcome { match outcome {
BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK, BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK,
BlockProcessingOutcome::ParentUnknown { .. } => { BlockProcessingOutcome::ParentUnknown { parent } => {
// Clean the stale entries from the queue.
self.import_queue.remove_stale();
// Add this block to the queue
self.import_queue self.import_queue
.enqueue_full_blocks(vec![block], peer_id.clone()); .enqueue_full_blocks(vec![block], peer_id.clone());
// Unless the parent is in the queue, request the parent block from the peer.
//
// It is likely that this is duplicate work, given we already send a hello
// request. However, I believe there are some edge-cases where the hello
// message doesn't suffice, so we perform this request as well.
if !self.import_queue.contains_block_root(parent) {
// Send a hello to learn of the clients best slot so we can then sync the required
// parent(s).
network.send_rpc_request(
peer_id.clone(),
RPCRequest::Hello(hello_message(&self.chain)),
);
}
SHOULD_FORWARD_GOSSIP_BLOCK SHOULD_FORWARD_GOSSIP_BLOCK
} }
BlockProcessingOutcome::FutureSlot { BlockProcessingOutcome::FutureSlot {
@ -730,7 +721,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
if let Ok(outcome) = processing_result { if let Ok(outcome) = processing_result {
match outcome { match outcome {
BlockProcessingOutcome::Processed { block_root } => { BlockProcessingOutcome::Processed { block_root } => {
info!( debug!(
self.log, "Imported block from network"; self.log, "Imported block from network";
"source" => source, "source" => source,
"slot" => block.slot, "slot" => block.slot,
@ -747,28 +738,19 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
); );
// Send a hello to learn of the clients best slot so we can then sync the require // Unless the parent is in the queue, request the parent block from the peer.
// parent(s).
network.send_rpc_request(
peer_id.clone(),
RPCRequest::Hello(hello_message(&self.chain)),
);
// Explicitly request the parent block from the peer.
// //
// It is likely that this is duplicate work, given we already send a hello // It is likely that this is duplicate work, given we already send a hello
// request. However, I believe there are some edge-cases where the hello // request. However, I believe there are some edge-cases where the hello
// message doesn't suffice, so we perform this request as well. // message doesn't suffice, so we perform this request as well.
self.request_block_headers( if !self.import_queue.contains_block_root(parent) {
peer_id, // Send a hello to learn of the clients best slot so we can then sync the require
BeaconBlockHeadersRequest { // parent(s).
start_root: parent, network.send_rpc_request(
start_slot: block.slot - 1, peer_id.clone(),
max_headers: 1, RPCRequest::Hello(hello_message(&self.chain)),
skip_slots: 0, );
}, }
network,
)
} }
BlockProcessingOutcome::FutureSlot { BlockProcessingOutcome::FutureSlot {
present_slot, present_slot,

View File

@ -139,8 +139,7 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> {
Err(BeaconStateError::SlotOutOfBounds) => { Err(BeaconStateError::SlotOutOfBounds) => {
// Read a `BeaconState` from the store that has access to prior historical root. // Read a `BeaconState` from the store that has access to prior historical root.
let beacon_state: BeaconState<T> = { let beacon_state: BeaconState<T> = {
// Load the earlier state from disk. Skip forward one slot, because a state // Load the earliest state from disk.
// doesn't return it's own state root.
let new_state_root = self.beacon_state.get_oldest_state_root().ok()?; let new_state_root = self.beacon_state.get_oldest_state_root().ok()?;
self.store.get(&new_state_root).ok()? self.store.get(&new_state_root).ok()?