Implement block imports for sync
This commit is contained in:
parent
4b5b5851a6
commit
a57a7c2394
@ -46,6 +46,26 @@ pub enum BlockProcessingOutcome {
|
|||||||
InvalidBlock(InvalidBlock),
|
InvalidBlock(InvalidBlock),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl BlockProcessingOutcome {
|
||||||
|
/// Returns `true` if the block was objectively invalid and we should disregard the peer who
|
||||||
|
/// sent it.
|
||||||
|
pub fn is_invalid(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
BlockProcessingOutcome::ValidBlock(_) => false,
|
||||||
|
BlockProcessingOutcome::InvalidBlock(r) => match r {
|
||||||
|
InvalidBlock::FutureSlot => true,
|
||||||
|
InvalidBlock::StateRootMismatch => true,
|
||||||
|
InvalidBlock::ParentUnknown => false,
|
||||||
|
InvalidBlock::SlotProcessingError(_) => false,
|
||||||
|
InvalidBlock::PerBlockProcessingError(e) => match e {
|
||||||
|
BlockProcessingError::Invalid(_) => true,
|
||||||
|
BlockProcessingError::BeaconStateError(_) => false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct BeaconChain<T: ClientDB + Sized, U: SlotClock, F: ForkChoice> {
|
pub struct BeaconChain<T: ClientDB + Sized, U: SlotClock, F: ForkChoice> {
|
||||||
pub block_store: Arc<BeaconBlockStore<T>>,
|
pub block_store: Arc<BeaconBlockStore<T>>,
|
||||||
pub state_store: Arc<BeaconStateStore<T>>,
|
pub state_store: Arc<BeaconStateStore<T>>,
|
||||||
@ -685,10 +705,10 @@ where
|
|||||||
// TODO: check the block proposer signature BEFORE doing a state transition. This will
|
// TODO: check the block proposer signature BEFORE doing a state transition. This will
|
||||||
// significantly lower exposure surface to DoS attacks.
|
// significantly lower exposure surface to DoS attacks.
|
||||||
|
|
||||||
// Transition the parent state to the present slot.
|
// Transition the parent state to the block slot.
|
||||||
let mut state = parent_state;
|
let mut state = parent_state;
|
||||||
let previous_block_header = parent_block.block_header();
|
let previous_block_header = parent_block.block_header();
|
||||||
for _ in state.slot.as_u64()..present_slot.as_u64() {
|
for _ in state.slot.as_u64()..block.slot.as_u64() {
|
||||||
if let Err(e) = per_slot_processing(&mut state, &previous_block_header, &self.spec) {
|
if let Err(e) = per_slot_processing(&mut state, &previous_block_header, &self.spec) {
|
||||||
return Ok(BlockProcessingOutcome::InvalidBlock(
|
return Ok(BlockProcessingOutcome::InvalidBlock(
|
||||||
InvalidBlock::SlotProcessingError(e),
|
InvalidBlock::SlotProcessingError(e),
|
||||||
|
@ -10,7 +10,7 @@ use beacon_chain::{
|
|||||||
use eth2_libp2p::HelloMessage;
|
use eth2_libp2p::HelloMessage;
|
||||||
use types::{BeaconBlock, BeaconStateError, Epoch, Hash256, Slot};
|
use types::{BeaconBlock, BeaconStateError, Epoch, Hash256, Slot};
|
||||||
|
|
||||||
pub use beacon_chain::BeaconChainError;
|
pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome};
|
||||||
|
|
||||||
/// The network's API to the beacon chain.
|
/// The network's API to the beacon chain.
|
||||||
pub trait BeaconChain: Send + Sync {
|
pub trait BeaconChain: Send + Sync {
|
||||||
@ -34,6 +34,9 @@ pub trait BeaconChain: Send + Sync {
|
|||||||
|
|
||||||
fn hello_message(&self) -> HelloMessage;
|
fn hello_message(&self) -> HelloMessage;
|
||||||
|
|
||||||
|
fn process_block(&self, block: BeaconBlock)
|
||||||
|
-> Result<BlockProcessingOutcome, BeaconChainError>;
|
||||||
|
|
||||||
fn get_block_roots(
|
fn get_block_roots(
|
||||||
&self,
|
&self,
|
||||||
start_slot: Slot,
|
start_slot: Slot,
|
||||||
@ -98,6 +101,13 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn process_block(
|
||||||
|
&self,
|
||||||
|
block: BeaconBlock,
|
||||||
|
) -> Result<BlockProcessingOutcome, BeaconChainError> {
|
||||||
|
self.process_block(block)
|
||||||
|
}
|
||||||
|
|
||||||
fn get_block_roots(
|
fn get_block_roots(
|
||||||
&self,
|
&self,
|
||||||
start_slot: Slot,
|
start_slot: Slot,
|
||||||
|
@ -161,6 +161,17 @@ impl MessageHandler {
|
|||||||
&mut self.network_context,
|
&mut self.network_context,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
RPCResponse::BeaconBlockBodies(response) => {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"BeaconBlockBodies response received"; "peer" => format!("{:?}", peer_id)
|
||||||
|
);
|
||||||
|
self.sync.on_beacon_block_bodies_response(
|
||||||
|
peer_id,
|
||||||
|
response,
|
||||||
|
&mut self.network_context,
|
||||||
|
)
|
||||||
|
}
|
||||||
// TODO: Handle all responses
|
// TODO: Handle all responses
|
||||||
_ => panic!("Unknown response: {:?}", response),
|
_ => panic!("Unknown response: {:?}", response),
|
||||||
}
|
}
|
||||||
|
@ -3,12 +3,12 @@ use crate::message_handler::NetworkContext;
|
|||||||
use eth2_libp2p::rpc::methods::*;
|
use eth2_libp2p::rpc::methods::*;
|
||||||
use eth2_libp2p::rpc::{RPCRequest, RPCResponse};
|
use eth2_libp2p::rpc::{RPCRequest, RPCResponse};
|
||||||
use eth2_libp2p::PeerId;
|
use eth2_libp2p::PeerId;
|
||||||
use slog::{debug, error, o, warn};
|
use slog::{debug, error, info, o, warn};
|
||||||
use ssz::TreeHash;
|
use ssz::TreeHash;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Instant;
|
use std::time::{Duration, Instant};
|
||||||
use types::{BeaconBlockHeader, Epoch, Hash256, Slot};
|
use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot};
|
||||||
|
|
||||||
/// The number of slots that we can import blocks ahead of us, before going into full Sync mode.
|
/// The number of slots that we can import blocks ahead of us, before going into full Sync mode.
|
||||||
const SLOT_IMPORT_TOLERANCE: u64 = 100;
|
const SLOT_IMPORT_TOLERANCE: u64 = 100;
|
||||||
@ -102,7 +102,11 @@ pub struct SimpleSync {
|
|||||||
impl SimpleSync {
|
impl SimpleSync {
|
||||||
pub fn new(beacon_chain: Arc<BeaconChain>, log: &slog::Logger) -> Self {
|
pub fn new(beacon_chain: Arc<BeaconChain>, log: &slog::Logger) -> Self {
|
||||||
let sync_logger = log.new(o!("Service"=> "Sync"));
|
let sync_logger = log.new(o!("Service"=> "Sync"));
|
||||||
let import_queue = ImportQueue::new(beacon_chain.clone(), log.clone());
|
|
||||||
|
let queue_item_stale_time = Duration::from_secs(600);
|
||||||
|
|
||||||
|
let import_queue =
|
||||||
|
ImportQueue::new(beacon_chain.clone(), queue_item_stale_time, log.clone());
|
||||||
SimpleSync {
|
SimpleSync {
|
||||||
chain: beacon_chain.clone(),
|
chain: beacon_chain.clone(),
|
||||||
known_peers: HashMap::new(),
|
known_peers: HashMap::new(),
|
||||||
@ -229,13 +233,72 @@ impl SimpleSync {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let block_roots = self.import_queue.enqueue_headers(response.headers);
|
let block_roots = self
|
||||||
|
.import_queue
|
||||||
|
.enqueue_headers(response.headers, peer_id.clone());
|
||||||
|
|
||||||
if !block_roots.is_empty() {
|
if !block_roots.is_empty() {
|
||||||
self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network);
|
self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn on_beacon_block_bodies_response(
|
||||||
|
&mut self,
|
||||||
|
peer_id: PeerId,
|
||||||
|
response: BeaconBlockBodiesResponse,
|
||||||
|
network: &mut NetworkContext,
|
||||||
|
) {
|
||||||
|
self.import_queue
|
||||||
|
.enqueue_bodies(response.block_bodies, peer_id.clone());
|
||||||
|
self.process_import_queue(network);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn process_import_queue(&mut self, network: &mut NetworkContext) {
|
||||||
|
let mut blocks: Vec<(Hash256, BeaconBlock, PeerId)> = self
|
||||||
|
.import_queue
|
||||||
|
.partials
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(key, partial)| {
|
||||||
|
if let Some(_) = partial.body {
|
||||||
|
let (block, _root) = partial.clone().complete().expect("Body must be Some");
|
||||||
|
Some((*key, block, partial.sender.clone()))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Sort the blocks to be in ascending slot order.
|
||||||
|
blocks.sort_unstable_by(|a, b| a.1.slot.partial_cmp(&b.1.slot).unwrap());
|
||||||
|
|
||||||
|
let mut imported_keys = vec![];
|
||||||
|
|
||||||
|
for (key, block, sender) in blocks {
|
||||||
|
match self.chain.process_block(block) {
|
||||||
|
Ok(outcome) => {
|
||||||
|
if outcome.is_invalid() {
|
||||||
|
warn!(self.log, "Invalid block: {:?}", outcome);
|
||||||
|
network.disconnect(sender);
|
||||||
|
} else {
|
||||||
|
imported_keys.push(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(self.log, "Error during block processing"; "error" => format!("{:?}", e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("imported_keys.len: {:?}", imported_keys.len());
|
||||||
|
|
||||||
|
if !imported_keys.is_empty() {
|
||||||
|
info!(self.log, "Imported {} blocks", imported_keys.len());
|
||||||
|
for key in imported_keys {
|
||||||
|
self.import_queue.partials.remove(&key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn request_block_roots(
|
fn request_block_roots(
|
||||||
&mut self,
|
&mut self,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
@ -298,19 +361,41 @@ pub struct ImportQueue {
|
|||||||
pub chain: Arc<BeaconChain>,
|
pub chain: Arc<BeaconChain>,
|
||||||
/// Partially imported blocks, keyed by the root of `BeaconBlockBody`.
|
/// Partially imported blocks, keyed by the root of `BeaconBlockBody`.
|
||||||
pub partials: HashMap<Hash256, PartialBeaconBlock>,
|
pub partials: HashMap<Hash256, PartialBeaconBlock>,
|
||||||
|
/// Time before a queue entry is consider state.
|
||||||
|
pub stale_time: Duration,
|
||||||
/// Logging
|
/// Logging
|
||||||
log: slog::Logger,
|
log: slog::Logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ImportQueue {
|
impl ImportQueue {
|
||||||
pub fn new(chain: Arc<BeaconChain>, log: slog::Logger) -> Self {
|
pub fn new(chain: Arc<BeaconChain>, stale_time: Duration, log: slog::Logger) -> Self {
|
||||||
Self {
|
Self {
|
||||||
chain,
|
chain,
|
||||||
partials: HashMap::new(),
|
partials: HashMap::new(),
|
||||||
|
stale_time,
|
||||||
log,
|
log,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn remove_stale(&mut self) {
|
||||||
|
let keys: Vec<Hash256> = self
|
||||||
|
.partials
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(key, partial)| {
|
||||||
|
if partial.inserted + self.stale_time >= Instant::now() {
|
||||||
|
Some(*key)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
keys.iter().for_each(|key| {
|
||||||
|
self.partials.remove(&key);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if `self.chain` has not yet processed this block.
|
||||||
fn is_new_block(&self, block_root: &Hash256) -> bool {
|
fn is_new_block(&self, block_root: &Hash256) -> bool {
|
||||||
self.chain
|
self.chain
|
||||||
.is_new_block_root(&block_root)
|
.is_new_block_root(&block_root)
|
||||||
@ -322,9 +407,6 @@ impl ImportQueue {
|
|||||||
|
|
||||||
/// Returns the index of the first new root in the list of block roots.
|
/// Returns the index of the first new root in the list of block roots.
|
||||||
pub fn first_new_root(&mut self, roots: &[BlockRootSlot]) -> Option<usize> {
|
pub fn first_new_root(&mut self, roots: &[BlockRootSlot]) -> Option<usize> {
|
||||||
for root in roots {
|
|
||||||
println!("root {}", root.block_root);
|
|
||||||
}
|
|
||||||
roots
|
roots
|
||||||
.iter()
|
.iter()
|
||||||
.position(|brs| self.is_new_block(&brs.block_root))
|
.position(|brs| self.is_new_block(&brs.block_root))
|
||||||
@ -339,14 +421,18 @@ impl ImportQueue {
|
|||||||
/// If a `header` is already in the queue, but not yet processed by the chain the block root is
|
/// If a `header` is already in the queue, but not yet processed by the chain the block root is
|
||||||
/// included in the output and the `inserted` time for the partial record is set to
|
/// included in the output and the `inserted` time for the partial record is set to
|
||||||
/// `Instant::now()`. Updating the `inserted` time stops the partial from becoming stale.
|
/// `Instant::now()`. Updating the `inserted` time stops the partial from becoming stale.
|
||||||
pub fn enqueue_headers(&mut self, headers: Vec<BeaconBlockHeader>) -> Vec<Hash256> {
|
pub fn enqueue_headers(
|
||||||
|
&mut self,
|
||||||
|
headers: Vec<BeaconBlockHeader>,
|
||||||
|
sender: PeerId,
|
||||||
|
) -> Vec<Hash256> {
|
||||||
let mut required_bodies: Vec<Hash256> = vec![];
|
let mut required_bodies: Vec<Hash256> = vec![];
|
||||||
|
|
||||||
for header in headers {
|
for header in headers {
|
||||||
let block_root = Hash256::from_slice(&header.hash_tree_root()[..]);
|
let block_root = Hash256::from_slice(&header.hash_tree_root()[..]);
|
||||||
|
|
||||||
if self.is_new_block(&block_root) {
|
if self.is_new_block(&block_root) {
|
||||||
self.insert_partial(block_root, header);
|
self.insert_header(block_root, header, sender.clone());
|
||||||
required_bodies.push(block_root)
|
required_bodies.push(block_root)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -354,20 +440,60 @@ impl ImportQueue {
|
|||||||
required_bodies
|
required_bodies
|
||||||
}
|
}
|
||||||
|
|
||||||
fn insert_partial(&mut self, block_root: Hash256, header: BeaconBlockHeader) {
|
/// If there is a matching `header` for this `body`, adds it to the queue.
|
||||||
self.partials.insert(
|
///
|
||||||
header.block_body_root,
|
/// If there is no `header` for the `body`, the body is simply discarded.
|
||||||
PartialBeaconBlock {
|
pub fn enqueue_bodies(&mut self, bodies: Vec<BeaconBlockBody>, sender: PeerId) {
|
||||||
|
for body in bodies {
|
||||||
|
self.insert_body(body, sender.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Inserts a header to the queue.
|
||||||
|
///
|
||||||
|
/// If the header already exists, the `inserted` time is set to `now` and not other
|
||||||
|
/// modifications are made.
|
||||||
|
fn insert_header(&mut self, block_root: Hash256, header: BeaconBlockHeader, sender: PeerId) {
|
||||||
|
self.partials
|
||||||
|
.entry(header.block_body_root)
|
||||||
|
.and_modify(|p| p.inserted = Instant::now())
|
||||||
|
.or_insert(PartialBeaconBlock {
|
||||||
block_root,
|
block_root,
|
||||||
header,
|
header,
|
||||||
|
body: None,
|
||||||
inserted: Instant::now(),
|
inserted: Instant::now(),
|
||||||
},
|
sender,
|
||||||
);
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Updates an existing partial with the `body`.
|
||||||
|
///
|
||||||
|
/// If there is no header for the `body`, the body is simply discarded.
|
||||||
|
fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) {
|
||||||
|
let body_root = Hash256::from_slice(&body.hash_tree_root()[..]);
|
||||||
|
|
||||||
|
self.partials.entry(body_root).and_modify(|p| {
|
||||||
|
if body_root == p.header.block_body_root {
|
||||||
|
p.body = Some(body);
|
||||||
|
p.inserted = Instant::now();
|
||||||
|
p.sender = sender;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
pub struct PartialBeaconBlock {
|
pub struct PartialBeaconBlock {
|
||||||
pub block_root: Hash256,
|
pub block_root: Hash256,
|
||||||
pub header: BeaconBlockHeader,
|
pub header: BeaconBlockHeader,
|
||||||
|
pub body: Option<BeaconBlockBody>,
|
||||||
pub inserted: Instant,
|
pub inserted: Instant,
|
||||||
|
pub sender: PeerId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialBeaconBlock {
|
||||||
|
/// Given a `body`, consumes `self` and returns a complete `BeaconBlock` along with its root.
|
||||||
|
pub fn complete(self) -> Option<(BeaconBlock, Hash256)> {
|
||||||
|
Some((self.header.into_block(self.body?), self.block_root))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -77,7 +77,16 @@ impl SyncNode {
|
|||||||
|
|
||||||
match request {
|
match request {
|
||||||
RPCRequest::BeaconBlockHeaders(request) => request,
|
RPCRequest::BeaconBlockHeaders(request) => request,
|
||||||
_ => panic!("Did not get block root request"),
|
_ => panic!("Did not get block headers request"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_block_bodies_request(&self) -> BeaconBlockBodiesRequest {
|
||||||
|
let request = self.recv_rpc_request().expect("No block bodies request");
|
||||||
|
|
||||||
|
match request {
|
||||||
|
RPCRequest::BeaconBlockBodies(request) => request,
|
||||||
|
_ => panic!("Did not get block bodies request"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -223,6 +232,29 @@ impl SyncMaster {
|
|||||||
self.send_rpc_response(node, response)
|
self.send_rpc_response(node, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn respond_to_block_bodies_request(
|
||||||
|
&mut self,
|
||||||
|
node: &SyncNode,
|
||||||
|
request: BeaconBlockBodiesRequest,
|
||||||
|
) {
|
||||||
|
let block_bodies: Vec<BeaconBlockBody> = request
|
||||||
|
.block_roots
|
||||||
|
.iter()
|
||||||
|
.map(|root| {
|
||||||
|
let block = self
|
||||||
|
.harness
|
||||||
|
.beacon_chain
|
||||||
|
.get_block(root)
|
||||||
|
.expect("Failed to load block")
|
||||||
|
.expect("Block did not exist");
|
||||||
|
block.body
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let response = RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { block_bodies });
|
||||||
|
self.send_rpc_response(node, response)
|
||||||
|
}
|
||||||
|
|
||||||
fn send_rpc_response(&mut self, node: &SyncNode, rpc_response: RPCResponse) {
|
fn send_rpc_response(&mut self, node: &SyncNode, rpc_response: RPCResponse) {
|
||||||
node.send(self.rpc_response(node, rpc_response));
|
node.send(self.rpc_response(node, rpc_response));
|
||||||
}
|
}
|
||||||
@ -311,6 +343,11 @@ fn first_test() {
|
|||||||
|
|
||||||
master.respond_to_block_headers_request(&nodes[0], headers_request);
|
master.respond_to_block_headers_request(&nodes[0], headers_request);
|
||||||
|
|
||||||
std::thread::sleep(Duration::from_millis(500));
|
let bodies_request = nodes[0].get_block_bodies_request();
|
||||||
|
assert_eq!(bodies_request.block_roots.len(), 2);
|
||||||
|
|
||||||
|
master.respond_to_block_bodies_request(&nodes[0], bodies_request);
|
||||||
|
|
||||||
|
std::thread::sleep(Duration::from_millis(10000));
|
||||||
runtime.shutdown_now();
|
runtime.shutdown_now();
|
||||||
}
|
}
|
||||||
|
@ -37,6 +37,19 @@ impl BeaconBlockHeader {
|
|||||||
pub fn canonical_root(&self) -> Hash256 {
|
pub fn canonical_root(&self) -> Hash256 {
|
||||||
Hash256::from_slice(&self.hash_tree_root()[..])
|
Hash256::from_slice(&self.hash_tree_root()[..])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Given a `body`, consumes `self` and returns a complete `BeaconBlock`.
|
||||||
|
///
|
||||||
|
/// Spec v0.5.0
|
||||||
|
pub fn into_block(self, body: BeaconBlockBody) -> BeaconBlock {
|
||||||
|
BeaconBlock {
|
||||||
|
slot: self.slot,
|
||||||
|
previous_block_root: self.previous_block_root,
|
||||||
|
state_root: self.state_root,
|
||||||
|
body,
|
||||||
|
signature: self.signature,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
Loading…
Reference in New Issue
Block a user