Improve queueing in SimpleSync

This commit is contained in:
Paul Hauner 2019-03-31 09:44:58 +11:00
parent 65ae8fda47
commit d2b5cf5a32
No known key found for this signature in database
GPG Key ID: 303E4494BB28068C
4 changed files with 115 additions and 32 deletions

View File

@ -179,6 +179,19 @@ pub struct BeaconBlockRootsResponse {
pub roots: Vec<BlockRootSlot>, pub roots: Vec<BlockRootSlot>,
} }
impl BeaconBlockRootsResponse {
/// Returns `true` if each `self.roots.slot[i]` is higher than the preceeding `i`.
pub fn slots_are_ascending(&self) -> bool {
for i in 1..self.roots.len() {
if self.roots[i - 1].slot >= self.roots[i].slot {
return false;
}
}
true
}
}
/// Contains a block root and associated slot. /// Contains a block root and associated slot.
#[derive(Encode, Decode, Clone, Debug, PartialEq)] #[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BlockRootSlot { pub struct BlockRootSlot {

View File

@ -5,7 +5,7 @@ use slog::{debug, error};
use ssz::TreeHash; use ssz::TreeHash;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256}; use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256, Slot};
/// Provides a queue for fully and partially built `BeaconBlock`s. /// Provides a queue for fully and partially built `BeaconBlock`s.
/// ///
@ -120,6 +120,38 @@ impl ImportQueue {
.position(|brs| self.is_new_block(&brs.block_root)) .position(|brs| self.is_new_block(&brs.block_root))
} }
/// Adds the `block_roots` to the partials queue.
///
/// If a `block_root` is not in the queue and has not been processed by the chain it is added
/// to the queue and it's block root is included in the output.
pub fn enqueue_block_roots(
&mut self,
block_roots: &[BlockRootSlot],
sender: PeerId,
) -> Vec<BlockRootSlot> {
let new_roots: Vec<BlockRootSlot> = block_roots
.iter()
// Ignore any roots already processed by the chain.
.filter(|brs| self.is_new_block(&brs.block_root))
// Ignore any roots already stored in the queue.
.filter(|brs| !self.partials.iter().any(|p| p.block_root == brs.block_root))
.cloned()
.collect();
new_roots.iter().for_each(|brs| {
self.partials.push(PartialBeaconBlock {
slot: brs.slot,
block_root: brs.block_root,
sender: sender.clone(),
header: None,
body: None,
inserted: Instant::now(),
})
});
new_roots
}
/// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for /// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for
/// which we should use to request `BeaconBlockBodies`. /// which we should use to request `BeaconBlockBodies`.
/// ///
@ -174,8 +206,9 @@ impl ImportQueue {
self.partials[i].inserted = Instant::now(); self.partials[i].inserted = Instant::now();
} else { } else {
self.partials.push(PartialBeaconBlock { self.partials.push(PartialBeaconBlock {
slot: header.slot,
block_root, block_root,
header, header: Some(header),
body: None, body: None,
inserted: Instant::now(), inserted: Instant::now(),
sender, sender,
@ -192,7 +225,8 @@ impl ImportQueue {
let body_root = Hash256::from_slice(&body.hash_tree_root()[..]); let body_root = Hash256::from_slice(&body.hash_tree_root()[..]);
self.partials.iter_mut().for_each(|mut p| { self.partials.iter_mut().for_each(|mut p| {
if body_root == p.header.block_body_root { if let Some(header) = &mut p.header {
if body_root == header.block_body_root {
p.inserted = Instant::now(); p.inserted = Instant::now();
if p.body.is_none() { if p.body.is_none() {
@ -200,6 +234,7 @@ impl ImportQueue {
p.sender = sender.clone(); p.sender = sender.clone();
} }
} }
}
}); });
} }
} }
@ -208,9 +243,10 @@ impl ImportQueue {
/// `BeaconBlock`. /// `BeaconBlock`.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct PartialBeaconBlock { pub struct PartialBeaconBlock {
pub slot: Slot,
/// `BeaconBlock` root. /// `BeaconBlock` root.
pub block_root: Hash256, pub block_root: Hash256,
pub header: BeaconBlockHeader, pub header: Option<BeaconBlockHeader>,
pub body: Option<BeaconBlockBody>, pub body: Option<BeaconBlockBody>,
/// The instant at which this record was created or last meaningfully modified. Used to /// The instant at which this record was created or last meaningfully modified. Used to
/// determine if an entry is stale and should be removed. /// determine if an entry is stale and should be removed.
@ -225,7 +261,7 @@ impl PartialBeaconBlock {
pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> { pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> {
Some(( Some((
self.block_root, self.block_root,
self.header.into_block(self.body?), self.header?.into_block(self.body?),
self.sender, self.sender,
)) ))
} }

View File

@ -358,32 +358,49 @@ impl SimpleSync {
if res.roots.is_empty() { if res.roots.is_empty() {
warn!( warn!(
self.log, self.log,
"Peer returned empty block roots response. PeerId: {:?}", peer_id "Peer returned empty block roots response";
"peer_id" => format!("{:?}", peer_id)
); );
return; return;
} }
let new_root_index = self.import_queue.first_new_root(&res.roots); // The wire protocol specifies that slots must be in ascending order.
if !res.slots_are_ascending() {
warn!(
self.log,
"Peer returned block roots response with bad slot ordering";
"peer_id" => format!("{:?}", peer_id)
);
return;
}
// If a new block root is found, request it and all the headers following it. let new_roots = self.import_queue.enqueue_block_roots(&res.roots, peer_id.clone());
// No new roots means nothing to do.
// //
// We make an assumption here that if we don't know a block then we don't know of all // This check protects against future panics.
// it's parents. This might not be the case if syncing becomes more sophisticated. if new_roots.is_empty() {
if let Some(i) = new_root_index { return;
let new = &res.roots[i]; }
// Determine the first (earliest) and last (latest) `BlockRootSlot` items.
//
// This logic relies upon slots to be in ascending order, which is enforced earlier.
let first = new_roots.first().expect("Non-empty list must have first");
let last = new_roots.last().expect("Non-empty list must have last");
// Request all headers between the earliest and latest new `BlockRootSlot` items.
self.request_block_headers( self.request_block_headers(
peer_id, peer_id,
BeaconBlockHeadersRequest { BeaconBlockHeadersRequest {
start_root: new.block_root, start_root: first.block_root,
start_slot: new.slot, start_slot: first.slot,
max_headers: (res.roots.len() - i) as u64, max_headers: (last.slot - first.slot + 1).as_u64(),
skip_slots: 0, skip_slots: 0,
}, },
network, network,
) )
} }
}
/// Handle a `BeaconBlockHeaders` request from the peer. /// Handle a `BeaconBlockHeaders` request from the peer.
pub fn on_beacon_block_headers_request( pub fn on_beacon_block_headers_request(
@ -528,8 +545,17 @@ impl SimpleSync {
"NewGossipBlock"; "NewGossipBlock";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
); );
// TODO: filter out messages that a prior to the finalized slot.
// // Ignore any block from a finalized slot.
if self.slot_is_finalized(msg.slot) {
warn!(
self.log, "NewGossipBlock";
"msg" => "new block slot is finalized.",
"slot" => msg.slot,
);
return;
}
// TODO: if the block is a few more slots ahead, try to get all block roots from then until // TODO: if the block is a few more slots ahead, try to get all block roots from then until
// now. // now.
// //
@ -675,6 +701,14 @@ impl SimpleSync {
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req)); network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req));
} }
fn slot_is_finalized(&self, slot: Slot) -> bool {
slot <= self
.chain
.hello_message()
.latest_finalized_epoch
.start_slot(self.chain.get_spec().slots_per_epoch)
}
/// Generates our current state in the form of a HELLO RPC message. /// Generates our current state in the form of a HELLO RPC message.
pub fn generate_hello(&self) -> HelloMessage { pub fn generate_hello(&self) -> HelloMessage {
self.chain.hello_message() self.chain.hello_message()

View File

@ -120,7 +120,7 @@ impl TestingBeaconStateBuilder {
}) })
.collect(); .collect();
let genesis_time = 1553753928; // arbitrary let genesis_time = 1553977336; // arbitrary
let mut state = BeaconState::genesis( let mut state = BeaconState::genesis(
genesis_time, genesis_time,