Improve gossipsub block processing

This commit is contained in:
Paul Hauner 2019-03-31 17:27:04 +11:00
parent c596e3f7d7
commit a93f898946
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
5 changed files with 160 additions and 45 deletions

View File

@ -10,7 +10,7 @@ use beacon_chain::{
use eth2_libp2p::rpc::HelloMessage;
use types::{Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot};
pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome};
pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome, InvalidBlock};
/// The network's API to the beacon chain.
pub trait BeaconChain: Send + Sync {

View File

@ -208,8 +208,9 @@ impl MessageHandler {
fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) {
match gossip_message {
PubsubMessage::Block(message) => {
self.sync
.on_block_gossip(peer_id, message, &mut self.network_context)
let _should_foward_on =
self.sync
.on_block_gossip(peer_id, message, &mut self.network_context);
}
PubsubMessage::Attestation(message) => {
self.sync

View File

@ -104,7 +104,7 @@ impl ImportQueue {
}
/// Returns `true` if `self.chain` has not yet processed this block.
pub fn is_new_block(&self, block_root: &Hash256) -> bool {
pub fn chain_has_not_seen_block(&self, block_root: &Hash256) -> bool {
self.chain
.is_new_block_root(&block_root)
.unwrap_or_else(|_| {
@ -125,7 +125,7 @@ impl ImportQueue {
let new_roots: Vec<BlockRootSlot> = block_roots
.iter()
// Ignore any roots already processed by the chain.
.filter(|brs| self.is_new_block(&brs.block_root))
.filter(|brs| self.chain_has_not_seen_block(&brs.block_root))
// Ignore any roots already stored in the queue.
.filter(|brs| !self.partials.iter().any(|p| p.block_root == brs.block_root))
.cloned()
@ -168,7 +168,7 @@ impl ImportQueue {
for header in headers {
let block_root = Hash256::from_slice(&header.hash_tree_root()[..]);
if self.is_new_block(&block_root) {
if self.chain_has_not_seen_block(&block_root) {
self.insert_header(block_root, header, sender.clone());
required_bodies.push(block_root)
}
@ -186,6 +186,12 @@ impl ImportQueue {
}
}
pub fn enqueue_full_blocks(&mut self, blocks: Vec<BeaconBlock>, sender: PeerId) {
for block in blocks {
self.insert_full_block(block, sender.clone());
}
}
/// Inserts a header to the queue.
///
/// If the header already exists, the `inserted` time is set to `now` and not other
@ -239,6 +245,32 @@ impl ImportQueue {
}
});
}
/// Updates an existing `partial` with the completed block, or adds a new (complete) partial.
///
/// If the partial already existed, the `inserted` time is set to `now`.
fn insert_full_block(&mut self, block: BeaconBlock, sender: PeerId) {
let block_root = Hash256::from_slice(&block.hash_tree_root()[..]);
let partial = PartialBeaconBlock {
slot: block.slot,
block_root,
header: Some(block.block_header()),
body: Some(block.body),
inserted: Instant::now(),
sender,
};
if let Some(i) = self
.partials
.iter()
.position(|p| p.block_root == block_root)
{
self.partials[i] = partial;
} else {
self.partials.push(partial)
}
}
}
/// Individual components of a `BeaconBlock`, potentially all that are required to form a full

View File

@ -1,10 +1,11 @@
use super::import_queue::ImportQueue;
use crate::beacon_chain::BeaconChain;
use crate::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock};
use crate::message_handler::NetworkContext;
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::PeerId;
use slog::{debug, error, info, o, warn};
use ssz::TreeHash;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
@ -16,6 +17,10 @@ const SLOT_IMPORT_TOLERANCE: u64 = 100;
/// The amount of seconds a block (or partial block) may exist in the import queue.
const QUEUE_STALE_SECS: u64 = 60;
/// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it.
/// Otherwise we queue it.
const FUTURE_SLOT_TOLERANCE: u64 = 1;
/// Keeps track of syncing information for known connected peers.
#[derive(Clone, Copy, Debug)]
pub struct PeerSyncInfo {
@ -536,65 +541,130 @@ impl SimpleSync {
}
/// Process a gossip message declaring a new block.
///
/// Returns a `bool` which, if `true`, indicates we should forward the block to our peers.
pub fn on_block_gossip(
&mut self,
peer_id: PeerId,
block: BeaconBlock,
network: &mut NetworkContext,
) {
) -> bool {
info!(
self.log,
"NewGossipBlock";
"peer" => format!("{:?}", peer_id),
);
/*
// Ignore any block from a finalized slot.
if self.slot_is_finalized(msg.slot) {
if self.slot_is_finalized(block.slot) {
warn!(
self.log, "NewGossipBlock";
"msg" => "new block slot is finalized.",
"slot" => msg.slot,
"block_slot" => block.slot,
);
return;
return false;
}
let block_root = Hash256::from_slice(&block.hash_tree_root());
// Ignore any block that the chain already knows about.
if self.chain_has_seen_block(&msg.block_root) {
return;
if self.chain_has_seen_block(&block_root) {
println!("this happened");
// TODO: Age confirm that we shouldn't forward a block if we already know of it.
return false;
}
// k
if msg.slot == self.chain.hello_message().best_slot + 1 {
self.request_block_headers(
peer_id,
BeaconBlockHeadersRequest {
start_root: msg.block_root,
start_slot: msg.slot,
max_headers: 1,
skip_slots: 0,
},
network,
)
debug!(
self.log,
"NewGossipBlock";
"peer" => format!("{:?}", peer_id),
"msg" => "processing block",
);
match self.chain.process_block(block.clone()) {
Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::ParentUnknown)) => {
// get the parent.
true
}
Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::FutureSlot {
present_slot,
block_slot,
})) => {
if block_slot - present_slot > FUTURE_SLOT_TOLERANCE {
// The block is too far in the future, drop it.
warn!(
self.log, "NewGossipBlock";
"msg" => "future block rejected",
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
"peer" => format!("{:?}", peer_id),
);
// Do not forward the block around to peers.
false
} else {
// The block is in the future, but not too far.
warn!(
self.log, "NewGossipBlock";
"msg" => "queuing future block",
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
"peer" => format!("{:?}", peer_id),
);
// Queue the block for later processing.
self.import_queue.enqueue_full_blocks(vec![block], peer_id);
// Forward the block around to peers.
true
}
}
Ok(outcome) => {
if outcome.is_invalid() {
// The peer has sent a block which is fundamentally invalid.
warn!(
self.log, "NewGossipBlock";
"msg" => "invalid block from peer",
"outcome" => format!("{:?}", outcome),
"peer" => format!("{:?}", peer_id),
);
// Disconnect the peer
network.disconnect(peer_id, GoodbyeReason::Fault);
// Do not forward the block to peers.
false
} else if outcome.sucessfully_processed() {
// The block was valid and we processed it successfully.
info!(
self.log, "NewGossipBlock";
"msg" => "block import successful",
"peer" => format!("{:?}", peer_id),
);
// Forward the block to peers
true
} else {
// The block wasn't necessarily invalid but we didn't process it successfully.
// This condition shouldn't be reached.
error!(
self.log, "NewGossipBlock";
"msg" => "unexpected condition in processing block.",
"outcome" => format!("{:?}", outcome),
);
// Do not forward the block on.
false
}
}
Err(e) => {
// We encountered an error whilst processing the block.
//
// Blocks should not be able to trigger errors, instead they should be flagged as
// invalid.
error!(
self.log, "NewGossipBlock";
"msg" => "internal error in processing block.",
"error" => format!("{:?}", e),
);
// Do not forward the block to peers.
false
}
}
// TODO: if the block is a few more slots ahead, try to get all block roots from then until
// now.
//
// Note: only requests the new block -- will fail if we don't have its parents.
if !self.chain_has_seen_block(&msg.block_root) {
self.request_block_headers(
peer_id,
BeaconBlockHeadersRequest {
start_root: msg.block_root,
start_slot: msg.slot,
max_headers: 1,
skip_slots: 0,
},
network,
)
}
*/
}
/// Process a gossip message declaring a new attestation.
@ -724,6 +794,18 @@ impl SimpleSync {
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req));
}
/// Returns `true` if `self.chain` has not yet processed this block.
pub fn chain_has_seen_block(&self, block_root: &Hash256) -> bool {
!self
.chain
.is_new_block_root(&block_root)
.unwrap_or_else(|_| {
error!(self.log, "Unable to determine if block is new.");
false
})
}
/// Returns `true` if the given slot is finalized in our chain.
fn slot_is_finalized(&self, slot: Slot) -> bool {
slot <= self
.chain

View File

@ -120,7 +120,7 @@ impl TestingBeaconStateBuilder {
})
.collect();
let genesis_time = 1553977336; // arbitrary
let genesis_time = 1554013000; // arbitrary
let mut state = BeaconState::genesis(
genesis_time,