Sync Re-Write (#663)

* Apply clippy lints to beacon node

* Remove unnecessary logging and correct formatting

* Initial bones of load-balanced range-sync

* Port bump meshsup tests

* Further structure and network handling logic added

* Basic structure, ignoring error handling

* Correct max peers delay bug

* Clean up and re-write message processor and sync manager

* Restructure directory, correct type issues

* Fix compiler issues

* Completed first testing of new sync

* Correct merge issues

* Clean up warnings

* Push attestation processed log down to dbg

* Correct math error, downgraded logs

* Add RPC error handling and improved syncing code

* Add libp2p stream error handling and dropping of invalid peers

* Lower logs

* Add discovery tweak

* Correct libp2p service locking

* Handles peer disconnects for sync

* Add logs downgrade discovery log

* Less fork choice (#679)

* Try merge in change to reduce fork choice calls

* Remove fork choice from process block

* Minor log fix

* Check successes > 0

* Fix failing beacon chain tests

* Fix re-org warnings

* Fix mistake in prev commit

* Range sync refactor

- Introduces `ChainCollection`
- Correct Disconnect node handling
- Removes duplicate code

* Various bug fixes

* Remove unnecessary logs

* Maintain syncing state in the transition from finalied to head

* Improved disconnect handling

* Adds forwards block interator

* Notifies lighthouse on stream timeouts

* Apply new gossipsub updates
This commit is contained in:
Age Manning 2019-12-09 18:50:21 +11:00 committed by GitHub
parent 988059bc9c
commit 5853326342
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1805 additions and 838 deletions

View File

@ -260,6 +260,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ReverseBlockRootIterator::new((head.beacon_block_root, head.beacon_block.slot), iter)
}
pub fn forwards_iter_block_roots(
&self,
start_slot: Slot,
) -> <T::Store as Store<T::EthSpec>>::ForwardsBlockRootsIterator {
let local_head = self.head();
T::Store::forwards_block_roots_iterator(
self.store.clone(),
start_slot,
local_head.beacon_state,
local_head.beacon_block_root,
&self.spec,
)
}
/// Traverse backwards from `block_root` to find the block roots of its ancestors.
///
/// ## Notes
@ -888,7 +902,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Only log a warning if our head is in a reasonable place to verify this attestation.
// This avoids excess logging during syncing.
if head_epoch + 1 >= attestation_epoch {
warn!(
debug!(
self.log,
"Dropped attestation for unknown block";
"block" => format!("{}", attestation.data.beacon_block_root)
@ -1334,23 +1348,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
metrics::stop_timer(fork_choice_register_timer);
let find_head_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE_FIND_HEAD);
// Execute the fork choice algorithm, enthroning a new head if discovered.
//
// Note: in the future we may choose to run fork-choice less often, potentially based upon
// some heuristic around number of attestations seen for the block.
if let Err(e) = self.fork_choice() {
error!(
self.log,
"fork choice failed to find head";
"error" => format!("{:?}", e)
)
};
metrics::stop_timer(find_head_timer);
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
metrics::observe(
&metrics::OPERATIONS_PER_BLOCK_ATTESTATION,

View File

@ -48,10 +48,6 @@ lazy_static! {
"beacon_block_processing_fork_choice_register_seconds",
"Time spent registering the new block with fork choice (but not finding head)"
);
pub static ref BLOCK_PROCESSING_FORK_CHOICE_FIND_HEAD: Result<Histogram> = try_create_histogram(
"beacon_block_processing_fork_choice_find_head_seconds",
"Time spent finding the new head after processing a new block"
);
/*
* Block Production

View File

@ -258,6 +258,8 @@ where
.process_block(block)
.expect("should not error during block processing");
self.chain.fork_choice().expect("should find head");
if let BlockProcessingOutcome::Processed { block_root } = outcome {
head_block_root = Some(block_root);

View File

@ -494,6 +494,11 @@ fn run_skip_slot_test(skip_slots: u64) {
})
);
harness_b
.chain
.fork_choice()
.expect("should run fork choice");
assert_eq!(
harness_b.chain.head().beacon_block.slot,
Slot::new(skip_slots + 1)

View File

@ -8,8 +8,8 @@ edition = "2018"
hex = "0.3"
# rust-libp2p is presently being sourced from a Sigma Prime fork of the
# `libp2p/rust-libp2p` repository.
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "f9ac7d542dc3e0b65c5cbbe4f45bfc3382ab2b4d" }
enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "f9ac7d542dc3e0b65c5cbbe4f45bfc3382ab2b4d", features = ["serde"] }
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "3f9b030e29c9b31f9fe6f2ed27be4a813e2b3701" }
enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "3f9b030e29c9b31f9fe6f2ed27be4a813e2b3701", features = ["serde"] }
types = { path = "../../eth2/types" }
serde = "1.0.102"
serde_derive = "1.0.102"

View File

@ -172,7 +172,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<IdentifyEv
);
info.listen_addrs.truncate(MAX_IDENTIFY_ADDRESSES);
}
debug!(self.log, "Identified Peer"; "Peer" => format!("{}", peer_id),
debug!(self.log, "Identified Peer"; "peer" => format!("{}", peer_id),
"protocol_version" => info.protocol_version,
"agent_version" => info.agent_version,
"listening_ addresses" => format!("{:?}", info.listen_addrs),

View File

@ -21,7 +21,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::timer::Delay;
/// Maximum seconds before searching for extra peers.
const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 60;
const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 120;
/// Initial delay between peer searches.
const INITIAL_SEARCH_DELAY: u64 = 5;
/// Local ENR storage filename.
@ -172,18 +172,6 @@ impl<TSubstream> Discovery<TSubstream> {
let random_node = NodeId::random();
debug!(self.log, "Searching for peers");
self.discovery.find_node(random_node);
// update the time until next discovery
let delay = {
if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES {
self.past_discovery_delay *= 2;
self.past_discovery_delay
} else {
MAX_TIME_BETWEEN_PEER_SEARCHES
}
};
self.peer_discovery_delay
.reset(Instant::now() + Duration::from_secs(delay));
}
}
@ -252,6 +240,10 @@ where
if self.connected_peers.len() < self.max_peers {
self.find_peers();
}
// Set to maximum, and update to earlier, once we get our results back.
self.peer_discovery_delay.reset(
Instant::now() + Duration::from_secs(MAX_TIME_BETWEEN_PEER_SEARCHES),
);
}
Ok(Async::NotReady) => break,
Err(e) => {
@ -283,6 +275,17 @@ where
}
Discv5Event::FindNodeResult { closer_peers, .. } => {
debug!(self.log, "Discovery query completed"; "peers_found" => closer_peers.len());
// update the time to the next query
if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES {
self.past_discovery_delay *= 2;
}
let delay = std::cmp::max(
self.past_discovery_delay,
MAX_TIME_BETWEEN_PEER_SEARCHES,
);
self.peer_discovery_delay
.reset(Instant::now() + Duration::from_secs(delay));
if closer_peers.is_empty() {
debug!(self.log, "Discovery random query found no peers");
}

View File

@ -8,11 +8,11 @@ use crate::rpc::protocol::{InboundFramed, OutboundFramed};
use core::marker::PhantomData;
use fnv::FnvHashMap;
use futures::prelude::*;
use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade};
use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
use libp2p::swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use slog::{crit, debug, error, trace, warn};
use slog::{crit, debug, error, trace};
use smallvec::SmallVec;
use std::collections::hash_map::Entry;
use std::time::{Duration, Instant};
@ -25,6 +25,9 @@ use tokio::timer::{delay_queue, DelayQueue};
/// The time (in seconds) before a substream that is awaiting a response from the user times out.
pub const RESPONSE_TIMEOUT: u64 = 10;
/// The number of times to retry an outbound upgrade in the case of IO errors.
const IO_ERROR_RETRIES: u8 = 3;
/// Inbound requests are given a sequential `RequestId` to keep track of.
type InboundRequestId = RequestId;
/// Outbound requests are associated with an id that is given by the application that sent the
@ -40,7 +43,7 @@ where
listen_protocol: SubstreamProtocol<RPCProtocol>,
/// If `Some`, something bad happened and we should shut down the handler with an error.
pending_error: Option<ProtocolsHandlerUpgrErr<RPCError>>,
pending_error: Option<(RequestId, ProtocolsHandlerUpgrErr<RPCError>)>,
/// Queue of events to produce in `poll()`.
events_out: SmallVec<[RPCEvent; 4]>,
@ -81,6 +84,10 @@ where
/// After the given duration has elapsed, an inactive connection will shutdown.
inactive_timeout: Duration,
/// Try to negotiate the outbound upgrade a few times if there is an IO error before reporting the request as failed.
/// This keeps track of the number of attempts.
outbound_io_error_retries: u8,
/// Logger for handling RPC streams
log: slog::Logger,
@ -150,6 +157,7 @@ where
max_dial_negotiated: 8,
keep_alive: KeepAlive::Yes,
inactive_timeout,
outbound_io_error_retries: 0,
log: log.clone(),
_phantom: PhantomData,
}
@ -339,13 +347,29 @@ where
fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
request: Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error,
>,
) {
if let ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error {
self.outbound_io_error_retries += 1;
if self.outbound_io_error_retries < IO_ERROR_RETRIES {
self.send_request(request);
return;
}
}
self.outbound_io_error_retries = 0;
// add the error
let request_id = {
if let RPCEvent::Request(id, _) = request {
id
} else {
0
}
};
if self.pending_error.is_none() {
self.pending_error = Some(error);
self.pending_error = Some((request_id, error));
}
}
@ -359,11 +383,43 @@ where
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
Self::Error,
> {
if let Some(err) = self.pending_error.take() {
// Returning an error here will result in dropping any peer that doesn't support any of
// the RPC protocols. For our immediate purposes we permit this and simply log that an
// upgrade was not supported.
warn!(self.log,"RPC Protocol was not supported"; "Error" => format!("{}", err));
if let Some((request_id, err)) = self.pending_error.take() {
// Returning an error here will result in dropping the peer.
match err {
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(
RPCError::InvalidProtocol(protocol_string),
)) => {
// Peer does not support the protocol.
// TODO: We currently will not drop the peer, for maximal compatibility with
// other clients testing their software. In the future, we will need to decide
// which protocols are a bare minimum to support before kicking the peer.
error!(self.log, "Peer doesn't support the RPC protocol"; "protocol" => protocol_string);
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, RPCError::InvalidProtocol(protocol_string)),
)));
}
ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => {
// negotiation timeout, mark the request as failed
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
request_id,
RPCError::Custom("Protocol negotiation timeout".into()),
),
)));
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) => {
// IO/Decode/Custom Error, report to the application
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, err),
)));
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => {
// Error during negotiation
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, RPCError::Custom(format!("{}", err))),
)));
}
}
}
// return any events that need to be reported
@ -385,12 +441,19 @@ where
}
// purge expired outbound substreams
while let Async::Ready(Some(stream_id)) = self
if let Async::Ready(Some(stream_id)) = self
.outbound_substreams_delay
.poll()
.map_err(|_| ProtocolsHandlerUpgrErr::Timer)?
{
self.outbound_substreams.remove(stream_id.get_ref());
// notify the user
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
stream_id.get_ref().clone(),
RPCError::Custom("Stream timed out".into()),
),
)));
}
// drive inbound streams that need to be processed

View File

@ -84,7 +84,7 @@ fn test_gossipsub_full_mesh_publish() {
let log = common::build_log(Level::Info, false);
let num_nodes = 20;
let mut nodes = common::build_full_mesh(log, num_nodes, None);
let mut nodes = common::build_full_mesh(log, num_nodes, Some(11320));
let mut publishing_node = nodes.pop().unwrap();
let pubsub_message = PubsubMessage::Block(vec![0; 4]);
let publishing_topic: String = "/eth2/beacon_block/ssz".into();

View File

@ -231,33 +231,30 @@ fn network_service(
}
}
loop {
// poll the swarm
let mut locked_service = libp2p_service.lock();
match locked_service.poll() {
let mut peers_to_ban = Vec::new();
loop {
match libp2p_service.lock().poll() {
Ok(Async::Ready(Some(event))) => match event {
Libp2pEvent::RPC(peer_id, rpc_event) => {
trace!(log, "Received RPC"; "rpc" => format!("{}", rpc_event));
// if we received a Goodbye message, drop and ban the peer
if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event {
locked_service.disconnect_and_ban_peer(
peer_id.clone(),
std::time::Duration::from_secs(BAN_PEER_TIMEOUT),
);
peers_to_ban.push(peer_id.clone());
};
message_handler_send
.try_send(HandlerMessage::RPC(peer_id, rpc_event))
.map_err(|_| "Failed to send RPC to handler")?;
}
Libp2pEvent::PeerDialed(peer_id) => {
debug!(log, "Peer Dialed"; "PeerID" => format!("{:?}", peer_id));
debug!(log, "Peer Dialed"; "peer_id" => format!("{:?}", peer_id));
message_handler_send
.try_send(HandlerMessage::PeerDialed(peer_id))
.map_err(|_| "Failed to send PeerDialed to handler")?;
}
Libp2pEvent::PeerDisconnected(peer_id) => {
debug!(log, "Peer Disconnected"; "PeerID" => format!("{:?}", peer_id));
debug!(log, "Peer Disconnected"; "peer_id" => format!("{:?}", peer_id));
message_handler_send
.try_send(HandlerMessage::PeerDisconnected(peer_id))
.map_err(|_| "Failed to send PeerDisconnected to handler")?;
@ -280,6 +277,14 @@ fn network_service(
}
}
// ban and disconnect any peers that sent Goodbye requests
while let Some(peer_id) = peers_to_ban.pop() {
libp2p_service.lock().disconnect_and_ban_peer(
peer_id.clone(),
std::time::Duration::from_secs(BAN_PEER_TIMEOUT),
);
}
Ok(Async::NotReady)
})
}

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,7 @@ use beacon_chain::{
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::PeerId;
use slog::{debug, info, o, trace, warn};
use slog::{debug, error, o, trace, warn};
use ssz::Encode;
use std::sync::Arc;
use store::Store;
@ -60,8 +60,8 @@ pub struct MessageProcessor<T: BeaconChainTypes> {
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
/// A oneshot channel for destroying the sync thread.
_sync_exit: oneshot::Sender<()>,
/// A nextwork context to return and handle RPC requests.
network: NetworkContext,
/// A network context to return and handle RPC requests.
network: HandlerNetworkContext,
/// The `RPCHandler` logger.
log: slog::Logger,
}
@ -75,13 +75,12 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
log: &slog::Logger,
) -> Self {
let sync_logger = log.new(o!("service"=> "sync"));
let sync_network_context = NetworkContext::new(network_send.clone(), sync_logger.clone());
// spawn the sync thread
let (sync_send, _sync_exit) = super::manager::spawn(
executor,
Arc::downgrade(&beacon_chain),
sync_network_context,
network_send.clone(),
sync_logger,
);
@ -89,7 +88,7 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
chain: beacon_chain,
sync_send,
_sync_exit,
network: NetworkContext::new(network_send, log.clone()),
network: HandlerNetworkContext::new(network_send, log.clone()),
log: log.clone(),
}
}
@ -120,11 +119,8 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
///
/// Sends a `Status` message to the peer.
pub fn on_connect(&mut self, peer_id: PeerId) {
self.network.send_rpc_request(
None,
peer_id,
RPCRequest::Status(status_message(&self.chain)),
);
self.network
.send_rpc_request(peer_id, RPCRequest::Status(status_message(&self.chain)));
}
/// Handle a `Status` request.
@ -316,51 +312,47 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
"start_slot" => req.start_slot,
);
//TODO: Optimize this
// Currently for skipped slots, the blocks returned could be less than the requested range.
// In the current implementation we read from the db then filter out out-of-range blocks.
// Improving the db schema to prevent this would be ideal.
//TODO: This really needs to be read forward for infinite streams
// We should be reading the first block from the db, sending, then reading the next... we
// need a forwards iterator!!
let mut blocks: Vec<BeaconBlock<T::EthSpec>> = self
let mut block_roots = self
.chain
.rev_iter_block_roots()
.filter(|(_root, slot)| {
req.start_slot <= slot.as_u64() && req.start_slot + req.count > slot.as_u64()
})
.take_while(|(_root, slot)| req.start_slot <= slot.as_u64())
.filter_map(|(root, _slot)| {
.forwards_iter_block_roots(Slot::from(req.start_slot))
.take_while(|(_root, slot)| slot.as_u64() < req.start_slot + req.count)
.map(|(root, _slot)| root)
.collect::<Vec<_>>();
block_roots.dedup();
let mut blocks_sent = 0;
for root in block_roots {
if let Ok(Some(block)) = self.chain.store.get::<BeaconBlock<T::EthSpec>>(&root) {
Some(block)
// Due to skip slots, blocks could be out of the range, we ensure they are in the
// range before sending
if block.slot >= req.start_slot && block.slot < req.start_slot + req.count {
blocks_sent += 1;
self.network.send_rpc_response(
peer_id.clone(),
request_id,
RPCResponse::BlocksByRange(block.as_ssz_bytes()),
);
}
} else {
warn!(
error!(
self.log,
"Block in the chain is not in the store";
"request_root" => format!("{:}", root),
);
None
}
})
.filter(|block| block.slot >= req.start_slot)
.collect();
}
blocks.reverse();
blocks.dedup_by_key(|brs| brs.slot);
if blocks.len() < (req.count as usize) {
debug!(
if blocks_sent < (req.count as usize) {
trace!(
self.log,
"Sending BlocksByRange Response";
"BlocksByRange Response Sent";
"peer" => format!("{:?}", peer_id),
"msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot,
"current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(),
"requested" => req.count,
"returned" => blocks.len(),
);
"returned" => blocks_sent);
} else {
trace!(
self.log,
@ -369,17 +361,9 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
"start_slot" => req.start_slot,
"current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(),
"requested" => req.count,
"returned" => blocks.len(),
);
"returned" => blocks_sent);
}
for block in blocks {
self.network.send_rpc_response(
peer_id.clone(),
request_id,
RPCResponse::BlocksByRange(block.as_ssz_bytes()),
);
}
// send the stream terminator
self.network.send_rpc_error_response(
peer_id,
@ -442,6 +426,27 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
BlockProcessingOutcome::Processed { .. } => {
trace!(self.log, "Gossipsub block processed";
"peer_id" => format!("{:?}",peer_id));
// TODO: It would be better if we can run this _after_ we publish the block to
// reduce block propagation latency.
//
// The `MessageHandler` would be the place to put this, however it doesn't seem
// to have a reference to the `BeaconChain`. I will leave this for future
// works.
match self.chain.fork_choice() {
Ok(()) => trace!(
self.log,
"Fork choice success";
"location" => "block gossip"
),
Err(e) => error!(
self.log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "block gossip"
),
}
SHOULD_FORWARD_GOSSIP_BLOCK
}
BlockProcessingOutcome::ParentUnknown { .. } => {
@ -494,7 +499,7 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
match self.chain.process_attestation(msg.clone()) {
Ok(outcome) => match outcome {
AttestationProcessingOutcome::Processed => {
info!(
debug!(
self.log,
"Processed attestation";
"source" => "gossip",
@ -545,15 +550,17 @@ pub(crate) fn status_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>)
}
}
/// Wraps a Network Channel to employ various RPC/Sync related network functionality.
pub struct NetworkContext {
/// Wraps a Network Channel to employ various RPC related network functionality for the message
/// handler. The handler doesn't manage it's own request Id's and can therefore only send
/// responses or requests with 0 request Ids.
pub struct HandlerNetworkContext {
/// The network channel to relay messages to the Network service.
network_send: mpsc::UnboundedSender<NetworkMessage>,
/// Logger for the `NetworkContext`.
log: slog::Logger,
}
impl NetworkContext {
impl HandlerNetworkContext {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>, log: slog::Logger) -> Self {
Self { network_send, log }
}
@ -565,7 +572,7 @@ impl NetworkContext {
"reason" => format!("{:?}", reason),
"peer_id" => format!("{:?}", peer_id),
);
self.send_rpc_request(None, peer_id.clone(), RPCRequest::Goodbye(reason));
self.send_rpc_request(peer_id.clone(), RPCRequest::Goodbye(reason));
self.network_send
.try_send(NetworkMessage::Disconnect { peer_id })
.unwrap_or_else(|_| {
@ -576,14 +583,10 @@ impl NetworkContext {
});
}
pub fn send_rpc_request(
&mut self,
request_id: Option<RequestId>,
peer_id: PeerId,
rpc_request: RPCRequest,
) {
// use 0 as the default request id, when an ID is not required.
let request_id = request_id.unwrap_or_else(|| 0);
pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) {
// the message handler cannot send requests with ids. Id's are managed by the sync
// manager.
let request_id = 0;
self.send_rpc_event(peer_id, RPCEvent::Request(request_id, rpc_request));
}

View File

@ -3,6 +3,8 @@
//! Stores the various syncing methods for the beacon chain.
mod manager;
mod message_processor;
mod network_context;
mod range_sync;
pub use message_processor::MessageProcessor;

View File

@ -0,0 +1,133 @@
//! Provides network functionality for the Syncing thread. This fundamentally wraps a network
//! channel and stores a global RPC ID to perform requests.
use super::message_processor::status_message;
use crate::service::NetworkMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RequestId};
use eth2_libp2p::PeerId;
use slog::{debug, trace, warn};
use std::sync::Weak;
use tokio::sync::mpsc;
/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id.
pub struct SyncNetworkContext {
/// The network channel to relay messages to the Network service.
network_send: mpsc::UnboundedSender<NetworkMessage>,
request_id: RequestId,
/// Logger for the `SyncNetworkContext`.
log: slog::Logger,
}
impl SyncNetworkContext {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>, log: slog::Logger) -> Self {
Self {
network_send,
request_id: 0,
log,
}
}
pub fn status_peer<T: BeaconChainTypes>(
&mut self,
chain: Weak<BeaconChain<T>>,
peer_id: PeerId,
) {
trace!(
self.log,
"Sending Status Request";
"method" => "STATUS",
"peer" => format!("{:?}", peer_id)
);
if let Some(chain) = chain.upgrade() {
let _ = self.send_rpc_request(peer_id, RPCRequest::Status(status_message(&chain)));
}
}
pub fn blocks_by_range_request(
&mut self,
peer_id: PeerId,
request: BlocksByRangeRequest,
) -> Result<RequestId, &'static str> {
trace!(
self.log,
"Sending BlocksByRange Request";
"method" => "BlocksByRange",
"count" => request.count,
"peer" => format!("{:?}", peer_id)
);
self.send_rpc_request(peer_id.clone(), RPCRequest::BlocksByRange(request))
}
pub fn blocks_by_root_request(
&mut self,
peer_id: PeerId,
request: BlocksByRootRequest,
) -> Result<RequestId, &'static str> {
trace!(
self.log,
"Sending BlocksByRoot Request";
"method" => "BlocksByRoot",
"count" => request.block_roots.len(),
"peer" => format!("{:?}", peer_id)
);
self.send_rpc_request(peer_id.clone(), RPCRequest::BlocksByRoot(request))
}
pub fn downvote_peer(&mut self, peer_id: PeerId) {
debug!(
self.log,
"Peer downvoted";
"peer" => format!("{:?}", peer_id)
);
// TODO: Implement reputation
self.disconnect(peer_id.clone(), GoodbyeReason::Fault);
}
fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
warn!(
&self.log,
"Disconnecting peer (RPC)";
"reason" => format!("{:?}", reason),
"peer_id" => format!("{:?}", peer_id),
);
// ignore the error if the channel send fails
let _ = self.send_rpc_request(peer_id.clone(), RPCRequest::Goodbye(reason));
self.network_send
.try_send(NetworkMessage::Disconnect { peer_id })
.unwrap_or_else(|_| {
warn!(
self.log,
"Could not send a Disconnect to the network service"
)
});
}
pub fn send_rpc_request(
&mut self,
peer_id: PeerId,
rpc_request: RPCRequest,
) -> Result<RequestId, &'static str> {
let request_id = self.request_id;
self.request_id += 1;
self.send_rpc_event(peer_id, RPCEvent::Request(request_id, rpc_request))?;
Ok(request_id)
}
fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) -> Result<(), &'static str> {
self.network_send
.try_send(NetworkMessage::RPC(peer_id, rpc_event))
.map_err(|_| {
// This is likely to happen when shutting down. Suppress this warning to trace for now
trace!(
self.log,
"Could not send RPC message to the network service"
);
"Network channel send Failed"
})
}
}

View File

@ -0,0 +1,649 @@
use crate::sync::message_processor::FUTURE_SLOT_TOLERANCE;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::RequestId;
use eth2_libp2p::PeerId;
use fnv::FnvHashMap;
use slog::{crit, debug, error, trace, warn, Logger};
use std::cmp::Ordering;
use std::collections::HashSet;
use std::ops::Sub;
use std::sync::Weak;
use types::{BeaconBlock, EthSpec, Hash256, Slot};
/// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch
/// is requested. There is a timeout for each batch request. If this value is too high, we will
/// downvote peers with poor bandwidth. This can be set arbitrarily high, in which case the
/// responder will fill the response up to the max request size, assuming they have the bandwidth
/// to do so.
//TODO: Make this dynamic based on peer's bandwidth
const BLOCKS_PER_BATCH: u64 = 50;
/// The number of times to retry a batch before the chain is considered failed and removed.
const MAX_BATCH_RETRIES: u8 = 5;
#[derive(PartialEq)]
pub struct Batch<T: EthSpec> {
/// The ID of the batch, batches are ID's sequentially.
id: u64,
/// The requested start slot of the batch, inclusive.
start_slot: Slot,
/// The requested end slot of batch, exclusive.
end_slot: Slot,
/// The hash of the chain root to requested from the peer.
head_root: Hash256,
/// The peer that was originally assigned to the batch.
_original_peer: PeerId,
/// The peer that is currently assigned to the batch.
pub current_peer: PeerId,
/// The number of retries this batch has undergone.
retries: u8,
/// The blocks that have been downloaded.
downloaded_blocks: Vec<BeaconBlock<T>>,
}
impl<T: EthSpec> Ord for Batch<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.id.cmp(&other.id)
}
}
impl<T: EthSpec> PartialOrd for Batch<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
pub enum ProcessingResult {
KeepChain,
RemoveChain,
}
impl<T: EthSpec> Eq for Batch<T> {}
impl<T: EthSpec> Batch<T> {
fn new(id: u64, start_slot: Slot, end_slot: Slot, head_root: Hash256, peer_id: PeerId) -> Self {
Batch {
id,
start_slot,
end_slot,
head_root,
_original_peer: peer_id.clone(),
current_peer: peer_id,
retries: 0,
downloaded_blocks: Vec::new(),
}
}
fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest {
BlocksByRangeRequest {
head_block_root: self.head_root,
start_slot: self.start_slot.into(),
count: std::cmp::min(BLOCKS_PER_BATCH, self.end_slot.sub(self.start_slot).into()),
step: 1,
}
}
}
pub struct SyncingChain<T: BeaconChainTypes> {
/// The original start slot when this chain was initialised.
pub start_slot: Slot,
/// The target head slot.
pub target_head_slot: Slot,
/// The target head root.
pub target_head_root: Hash256,
/// The batches that are currently awaiting a response from a peer. An RPC request for these
/// have been sent.
pub pending_batches: FnvHashMap<RequestId, Batch<T::EthSpec>>,
/// The batches that have been downloaded and are awaiting processing and/or validation.
completed_batches: Vec<Batch<T::EthSpec>>,
/// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain
/// and thus available to download this chain from.
pub peer_pool: HashSet<PeerId>,
/// The next batch_id that needs to be downloaded.
to_be_downloaded_id: u64,
/// The next batch id that needs to be processed.
to_be_processed_id: u64,
/// The last batch id that was processed.
last_processed_id: u64,
/// The current state of the chain.
pub state: ChainSyncingState,
}
#[derive(PartialEq)]
pub enum ChainSyncingState {
/// The chain is not being synced.
Stopped,
/// The chain is undergoing syncing.
Syncing,
/// The chain is temporarily paused whilst an error is rectified.
Paused,
}
impl<T: BeaconChainTypes> SyncingChain<T> {
pub fn new(
start_slot: Slot,
target_head_slot: Slot,
target_head_root: Hash256,
peer_id: PeerId,
) -> Self {
let mut peer_pool = HashSet::new();
peer_pool.insert(peer_id);
SyncingChain {
start_slot,
target_head_slot,
target_head_root,
pending_batches: FnvHashMap::default(),
completed_batches: Vec::new(),
peer_pool,
to_be_downloaded_id: 1,
to_be_processed_id: 1,
last_processed_id: 0,
state: ChainSyncingState::Stopped,
}
}
pub fn on_block_response(
&mut self,
chain: &Weak<BeaconChain<T>>,
network: &mut SyncNetworkContext,
request_id: RequestId,
beacon_block: &Option<BeaconBlock<T::EthSpec>>,
log: &slog::Logger,
) -> Option<ProcessingResult> {
if let Some(block) = beacon_block {
let batch = self.pending_batches.get_mut(&request_id)?;
// This is not a stream termination, simply add the block to the request
batch.downloaded_blocks.push(block.clone());
return Some(ProcessingResult::KeepChain);
} else {
// A stream termination has been sent. This batch has ended. Process a completed batch.
let batch = self.pending_batches.remove(&request_id)?;
Some(self.process_completed_batch(chain.clone(), network, batch, log))
}
}
fn process_completed_batch(
&mut self,
chain: Weak<BeaconChain<T>>,
network: &mut SyncNetworkContext,
batch: Batch<T::EthSpec>,
log: &slog::Logger,
) -> ProcessingResult {
// An entire batch of blocks has been received. This functions checks to see if it can be processed,
// remove any batches waiting to be verified and if this chain is syncing, request new
// blocks for the peer.
debug!(log, "Completed batch received"; "id"=>batch.id, "blocks"=>batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len());
// The peer that completed this batch, may be re-requested if this batch doesn't complete
// the chain and there is no error in processing
let current_peer = batch.current_peer.clone();
// verify the range of received blocks
// Note that the order of blocks is verified in block processing
if let Some(last_slot) = batch.downloaded_blocks.last().map(|b| b.slot) {
// the batch is non-empty
if batch.start_slot > batch.downloaded_blocks[0].slot || batch.end_slot < last_slot {
warn!(log, "BlocksByRange response returned out of range blocks";
"response_initial_slot" => batch.downloaded_blocks[0].slot,
"requested_initial_slot" => batch.start_slot);
network.downvote_peer(batch.current_peer);
self.to_be_processed_id = batch.id; // reset the id back to here, when incrementing, it will check against completed batches
return ProcessingResult::KeepChain;
}
}
// Add this completed batch to the list of completed batches. This list will then need to
// be checked if any batches can be processed and verified for errors or invalid responses
// from peers. The logic is simpler to create this ordered batch list and to then process
// the list.
let insert_index = self
.completed_batches
.binary_search(&batch)
.unwrap_or_else(|index| index);
self.completed_batches.insert(insert_index, batch);
// We have a list of completed batches. It is not sufficient to process batch successfully
// to consider the batch correct. This is because batches could be erroneously empty, or
// incomplete. Therefore, a batch is considered valid, only if the next sequential batch is
// processed successfully. Therefore the `completed_batches` will store batches that have
// already be processed but not verified and therefore have Id's less than
// `self.to_be_processed_id`.
//TODO: Run the processing of blocks in a separate thread. Build a queue of completed
//blocks here, manage the queue and process them in another thread as they become
//available.
if self.state != ChainSyncingState::Paused {
// pre-emptively request more blocks from peers whilst we process current blocks,
self.send_range_request(network, current_peer);
// Try and process batches sequentially in the ordered list.
let current_process_id = self.to_be_processed_id;
for batch in self
.completed_batches
.iter()
.filter(|batch| batch.id >= current_process_id)
{
if batch.id == self.to_be_processed_id {
if batch.downloaded_blocks.is_empty() {
// the batch was empty, progress to the next block
self.to_be_processed_id += 1;
continue;
} else {
let mut successes = 0;
debug!(log, "Processing batch"; "batch_id" => batch.id);
match process_batch(chain.clone(), batch, &mut successes, log) {
Ok(_) => {
// batch was successfully processed
self.last_processed_id = self.to_be_processed_id;
self.to_be_processed_id += 1;
if let Some(chain) = chain.upgrade() {
match chain.fork_choice() {
Ok(()) => trace!(
log,
"Fork choice success";
"location" => "batch import success"
),
Err(e) => error!(
log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "batch import success"
),
}
}
}
Err(e) => {
warn!(log, "Block processing error"; "error"=> format!("{:?}", e));
if successes > 0 {
if let Some(chain) = chain.upgrade() {
match chain.fork_choice() {
Ok(()) => trace!(
log,
"Fork choice success";
"block_imports" => successes,
"location" => "batch import error"
),
Err(e) => error!(
log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "batch import error"
),
}
}
}
// batch processing failed
// this could be because this batch is invalid, or a previous invalidated batch
// is invalid. We need to find out which and downvote the peer that has sent us
// an invalid batch.
// firstly remove any validated batches
return self.handle_invalid_batch(chain, network);
}
}
}
} else {
// there are no more batches to be processed, end
break;
}
}
// remove any validated batches
let last_processed_id = self.last_processed_id;
self.completed_batches
.retain(|batch| batch.id >= last_processed_id);
// check if the chain has completed syncing, if not, request another batch from this peer
if self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH >= self.target_head_slot
{
// chain is completed
ProcessingResult::RemoveChain
} else {
// chain is not completed
ProcessingResult::KeepChain
}
} else {
ProcessingResult::KeepChain
}
}
fn handle_invalid_batch(
&mut self,
_chain: Weak<BeaconChain<T>>,
network: &mut SyncNetworkContext,
) -> ProcessingResult {
// The current batch could not be processed, indicating either the current or previous
// batches are invalid
// The previous batch could be
// incomplete due to the block sizes being too large to fit in a single RPC
// request or there could be consecutive empty batches which are not supposed to be there
// Address these two cases individually.
// Firstly, check if the past batch is invalid.
//
//TODO: Implement this logic
// Currently just fail the chain, and drop all associated peers
for peer_id in self.peer_pool.iter() {
network.downvote_peer(peer_id.clone());
}
ProcessingResult::RemoveChain
}
pub fn stop_syncing(&mut self) {
self.state = ChainSyncingState::Stopped;
}
// Either a new chain, or an old one with a peer list
pub fn start_syncing(
&mut self,
network: &mut SyncNetworkContext,
local_finalized_slot: Slot,
log: &slog::Logger,
) {
// A local finalized slot is provided as other chains may have made
// progress whilst this chain was Stopped or paused. If so, update the `processed_batch_id` to
// accommodate potentially downloaded batches from other chains. Also prune any old batches
// awaiting processing
// Only important if the local head is more than a batch worth of blocks ahead of
// what this chain believes is downloaded
let batches_ahead = local_finalized_slot
.as_u64()
.saturating_sub(self.start_slot.as_u64() + self.last_processed_id * BLOCKS_PER_BATCH)
/ BLOCKS_PER_BATCH;
if batches_ahead != 0 {
// there are `batches_ahead` whole batches that have been downloaded by another
// chain. Set the current processed_batch_id to this value.
debug!(log, "Updating chains processed batches"; "old_completed_slot" => self.start_slot + self.last_processed_id*BLOCKS_PER_BATCH, "new_completed_slot" => self.start_slot + (self.last_processed_id + batches_ahead)*BLOCKS_PER_BATCH);
self.last_processed_id += batches_ahead;
if self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH
> self.target_head_slot.as_u64()
{
crit!(
log,
"Current head slot is above the target head";
"target_head_slot" => self.target_head_slot.as_u64(),
"new_start" => self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH,
);
return;
}
// update the `to_be_downloaded_id`
if self.to_be_downloaded_id < self.last_processed_id {
self.to_be_downloaded_id = self.last_processed_id;
}
let last_processed_id = self.last_processed_id;
self.completed_batches
.retain(|batch| batch.id >= last_processed_id.saturating_sub(1));
}
// Now begin requesting blocks from the peer pool. Ignore any peers with currently
// pending requests
let pending_peers = self
.pending_batches
.values()
.map(|batch| batch.current_peer.clone())
.collect::<Vec<_>>();
let peers = self
.peer_pool
.iter()
.filter(|peer| !pending_peers.contains(peer))
.cloned()
.collect::<Vec<_>>();
for peer_id in peers {
// send a blocks by range request to the peer
self.send_range_request(network, peer_id);
}
self.state = ChainSyncingState::Syncing;
}
// A peer has been added, start batch requests for this peer
// this should only be called for a syncing chain
pub fn peer_added(
&mut self,
network: &mut SyncNetworkContext,
peer_id: PeerId,
log: &slog::Logger,
) {
// do not request blocks if the chain is not syncing
if let ChainSyncingState::Stopped = self.state {
debug!(log, "Peer added to a non-syncing chain"; "peer_id" => format!("{:?}", peer_id));
return;
}
// find the next batch and request it from the peer
self.send_range_request(network, peer_id);
}
// Re-STATUS all the peers in this chain
pub fn status_peers(&self, chain: Weak<BeaconChain<T>>, network: &mut SyncNetworkContext) {
for peer_id in self.peer_pool.iter() {
network.status_peer(chain.clone(), peer_id.clone());
}
}
fn send_range_request(&mut self, network: &mut SyncNetworkContext, peer_id: PeerId) {
// find the next pending batch and request it from the peer
if let Some(batch) = self.get_next_batch(peer_id) {
// send the batch
self.send_batch(network, batch);
}
}
fn send_batch(&mut self, network: &mut SyncNetworkContext, batch: Batch<T::EthSpec>) {
let request = batch.to_blocks_by_range_request();
if let Ok(request_id) = network.blocks_by_range_request(batch.current_peer.clone(), request)
{
// add the batch to pending list
self.pending_batches.insert(request_id, batch);
}
}
fn get_next_batch(&mut self, peer_id: PeerId) -> Option<Batch<T::EthSpec>> {
let batch_start_slot =
self.start_slot + self.to_be_downloaded_id.saturating_sub(1) * BLOCKS_PER_BATCH;
if batch_start_slot > self.target_head_slot {
return None;
}
let batch_end_slot = std::cmp::min(
batch_start_slot + BLOCKS_PER_BATCH,
self.target_head_slot.saturating_add(1u64),
);
let batch_id = self.to_be_downloaded_id;
// find the next batch id. The largest of the next sequential idea, of the next uncompleted
// id
let max_completed_id =
self.completed_batches
.iter()
.fold(0, |max, batch| if batch.id > max { batch.id } else { max });
self.to_be_downloaded_id =
std::cmp::max(self.to_be_downloaded_id + 1, max_completed_id + 1);
Some(Batch::new(
batch_id,
batch_start_slot,
batch_end_slot,
self.target_head_root,
peer_id,
))
}
// Checks if the request_id is associated with this chain. If so, attempts to re-request the
// batch. If the batch has exceeded the number of retries, returns Some(true), indicating
// the chain should be dropped.
pub fn inject_error(
&mut self,
network: &mut SyncNetworkContext,
peer_id: &PeerId,
request_id: &RequestId,
log: &slog::Logger,
) -> Option<ProcessingResult> {
if let Some(batch) = self.pending_batches.remove(&request_id) {
warn!(log, "Batch failed. RPC Error"; "id" => batch.id, "retries" => batch.retries, "peer" => format!("{:?}", peer_id));
Some(self.failed_batch(network, batch))
} else {
None
}
}
pub fn failed_batch(
&mut self,
network: &mut SyncNetworkContext,
mut batch: Batch<T::EthSpec>,
) -> ProcessingResult {
batch.retries += 1;
if batch.retries > MAX_BATCH_RETRIES {
// chain is unrecoverable, remove it
ProcessingResult::RemoveChain
} else {
// try to re-process the request using a different peer, if possible
let current_peer = &batch.current_peer;
let new_peer = self
.peer_pool
.iter()
.find(|peer| *peer != current_peer)
.unwrap_or_else(|| current_peer);
batch.current_peer = new_peer.clone();
self.send_batch(network, batch);
ProcessingResult::KeepChain
}
}
}
// Helper function to process block batches which only consumes the chain and blocks to process
fn process_batch<T: BeaconChainTypes>(
chain: Weak<BeaconChain<T>>,
batch: &Batch<T::EthSpec>,
successes: &mut usize,
log: &Logger,
) -> Result<(), String> {
for block in &batch.downloaded_blocks {
if let Some(chain) = chain.upgrade() {
let processing_result = chain.process_block(block.clone());
if let Ok(outcome) = processing_result {
match outcome {
BlockProcessingOutcome::Processed { block_root } => {
// The block was valid and we processed it successfully.
trace!(
log, "Imported block from network";
"slot" => block.slot,
"block_root" => format!("{}", block_root),
);
*successes += 1
}
BlockProcessingOutcome::ParentUnknown { parent } => {
// blocks should be sequential and all parents should exist
trace!(
log, "Parent block is unknown";
"parent_root" => format!("{}", parent),
"baby_block_slot" => block.slot,
);
return Err(format!(
"Block at slot {} has an unknown parent.",
block.slot
));
}
BlockProcessingOutcome::BlockIsAlreadyKnown => {
// this block is already known to us, move to the next
debug!(
log, "Imported a block that is already known";
"block_slot" => block.slot,
);
}
BlockProcessingOutcome::FutureSlot {
present_slot,
block_slot,
} => {
if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot {
// The block is too far in the future, drop it.
trace!(
log, "Block is ahead of our slot clock";
"msg" => "block for future slot rejected, check your time",
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
);
return Err(format!(
"Block at slot {} is too far in the future",
block.slot
));
} else {
// The block is in the future, but not too far.
trace!(
log, "Block is slightly ahead of our slot clock, ignoring.";
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
);
}
}
BlockProcessingOutcome::WouldRevertFinalizedSlot { .. } => {
trace!(
log, "Finalized or earlier block processed";
"outcome" => format!("{:?}", outcome),
);
// block reached our finalized slot or was earlier, move to the next block
}
BlockProcessingOutcome::GenesisBlock => {
trace!(
log, "Genesis block was processed";
"outcome" => format!("{:?}", outcome),
);
}
_ => {
warn!(
log, "Invalid block received";
"msg" => "peer sent invalid block",
"outcome" => format!("{:?}", outcome),
);
return Err(format!("Invalid block at slot {}", block.slot));
}
}
} else {
warn!(
log, "BlockProcessingFailure";
"msg" => "unexpected condition in processing block.",
"outcome" => format!("{:?}", processing_result)
);
return Err(format!(
"Unexpected block processing error: {:?}",
processing_result
));
}
} else {
return Ok(()); // terminate early due to dropped beacon chain
}
}
Ok(())
}

View File

@ -0,0 +1,284 @@
use super::chain::{ChainSyncingState, ProcessingResult, SyncingChain};
use crate::sync::message_processor::PeerSyncInfo;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::PeerId;
use slog::{debug, warn};
use std::sync::Weak;
use types::EthSpec;
use types::{Hash256, Slot};
pub enum SyncState {
Finalized,
Head,
Idle,
}
pub struct ChainCollection<T: BeaconChainTypes> {
finalized_chains: Vec<SyncingChain<T>>,
head_chains: Vec<SyncingChain<T>>,
sync_state: SyncState,
}
impl<T: BeaconChainTypes> ChainCollection<T> {
pub fn new() -> Self {
ChainCollection {
sync_state: SyncState::Idle,
finalized_chains: Vec::new(),
head_chains: Vec::new(),
}
}
pub fn sync_state(&self) -> &SyncState {
&self.sync_state
}
// if a finalized chain just completed, we assume we waiting for head syncing, unless a fully
// sync peer joins.
pub fn fully_synced_peer_found(&mut self) {
if let SyncState::Head = self.sync_state {
if self.head_chains.is_empty() {
self.sync_state = SyncState::Idle;
}
}
}
// after a finalized chain completes, the state should be waiting for a head chain
pub fn set_head_sync(&mut self) {
if let SyncState::Idle = self.sync_state {
self.sync_state = SyncState::Head;
}
}
fn finalized_syncing_index(&self) -> Option<usize> {
self.finalized_chains
.iter()
.enumerate()
.find_map(|(index, chain)| {
if chain.state == ChainSyncingState::Syncing {
Some(index)
} else {
None
}
})
}
pub fn purge_finalized(&mut self, local_finalized_slot: Slot) {
self.finalized_chains
.retain(|chain| chain.target_head_slot > local_finalized_slot);
}
pub fn purge_head(&mut self, head_slot: Slot) {
self.head_chains
.retain(|chain| chain.target_head_slot > head_slot);
}
fn get_chain<'a>(
chain: &'a mut [SyncingChain<T>],
target_head_root: Hash256,
target_head_slot: Slot,
) -> Option<&'a mut SyncingChain<T>> {
chain.iter_mut().find(|iter_chain| {
iter_chain.target_head_root == target_head_root
&& iter_chain.target_head_slot == target_head_slot
})
}
/// Finds any finalized chain if it exists.
pub fn get_finalized_mut(
&mut self,
target_head_root: Hash256,
target_head_slot: Slot,
) -> Option<&mut SyncingChain<T>> {
ChainCollection::get_chain(
self.finalized_chains.as_mut(),
target_head_root,
target_head_slot,
)
}
/// Finds any finalized chain if it exists.
pub fn get_head_mut(
&mut self,
target_head_root: Hash256,
target_head_slot: Slot,
) -> Option<&mut SyncingChain<T>> {
ChainCollection::get_chain(
self.head_chains.as_mut(),
target_head_root,
target_head_slot,
)
}
/// Checks if a new finalized state should become the syncing chain. Updates the state of the
/// collection.
pub fn update_finalized(
&mut self,
beacon_chain: Weak<BeaconChain<T>>,
network: &mut SyncNetworkContext,
log: &slog::Logger,
) {
let local_info = match beacon_chain.upgrade() {
Some(chain) => PeerSyncInfo::from(&chain),
None => {
warn!(log, "Beacon chain dropped. Chains not updated");
return;
}
};
let local_slot = local_info
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
// Remove any outdated finalized chains
self.purge_finalized(local_slot);
self.finalized_chains
.retain(|chain| !chain.peer_pool.is_empty());
// Remove any outdated head chains
self.purge_head(local_info.head_slot);
self.finalized_chains
.retain(|chain| !chain.peer_pool.is_empty());
// Check if any chains become the new syncing chain
if let Some(index) = self.finalized_syncing_index() {
// There is a current finalized chain syncing
let syncing_chain_peer_count = self.finalized_chains[index].peer_pool.len();
// search for a chain with more peers
if let Some((new_index, chain)) =
self.finalized_chains
.iter_mut()
.enumerate()
.find(|(iter_index, chain)| {
*iter_index != index && chain.peer_pool.len() > syncing_chain_peer_count
})
{
// A chain has more peers. Swap the syncing chain
debug!(log, "Switching finalized chains to sync"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot);
// Stop the current chain from syncing
self.finalized_chains[index].stop_syncing();
// Start the new chain
self.finalized_chains[new_index].start_syncing(network, local_slot, log);
self.sync_state = SyncState::Finalized;
}
} else if let Some(chain) = self
.finalized_chains
.iter_mut()
.max_by_key(|chain| chain.peer_pool.len())
{
// There is no currently syncing finalization chain, starting the one with the most peers
debug!(log, "New finalized chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot);
chain.start_syncing(network, local_slot, log);
self.sync_state = SyncState::Finalized;
} else {
// There are no finalized chains, update the state
if self.head_chains.is_empty() {
self.sync_state = SyncState::Idle;
} else {
self.sync_state = SyncState::Head;
}
}
}
/// Add a new finalized chain to the collection
pub fn new_finalized_chain(
&mut self,
local_finalized_slot: Slot,
target_head: Hash256,
target_slot: Slot,
peer_id: PeerId,
) {
self.finalized_chains.push(SyncingChain::new(
local_finalized_slot,
target_slot,
target_head,
peer_id,
));
}
/// Add a new finalized chain to the collection
pub fn new_head_chain(
&mut self,
network: &mut SyncNetworkContext,
remote_finalized_slot: Slot,
target_head: Hash256,
target_slot: Slot,
peer_id: PeerId,
log: &slog::Logger,
) {
// remove the peer from any other head chains
self.head_chains.iter_mut().for_each(|chain| {
chain.peer_pool.remove(&peer_id);
});
self.head_chains.retain(|chain| !chain.peer_pool.is_empty());
let mut new_head_chain =
SyncingChain::new(remote_finalized_slot, target_slot, target_head, peer_id);
// All head chains can sync simultaneously
new_head_chain.start_syncing(network, remote_finalized_slot, log);
self.head_chains.push(new_head_chain);
}
pub fn is_finalizing_sync(&self) -> bool {
!self.finalized_chains.is_empty()
}
fn request_function<'a, F, I>(chain: I, mut func: F) -> Option<(usize, ProcessingResult)>
where
I: Iterator<Item = &'a mut SyncingChain<T>>,
F: FnMut(&'a mut SyncingChain<T>) -> Option<ProcessingResult>,
{
chain
.enumerate()
.find_map(|(index, chain)| Some((index, func(chain)?)))
}
pub fn finalized_request<F>(&mut self, func: F) -> Option<(usize, ProcessingResult)>
where
F: FnMut(&mut SyncingChain<T>) -> Option<ProcessingResult>,
{
ChainCollection::request_function(self.finalized_chains.iter_mut(), func)
}
pub fn head_request<F>(&mut self, func: F) -> Option<(usize, ProcessingResult)>
where
F: FnMut(&mut SyncingChain<T>) -> Option<ProcessingResult>,
{
ChainCollection::request_function(self.head_chains.iter_mut(), func)
}
#[allow(dead_code)]
pub fn head_finalized_request<F>(&mut self, func: F) -> Option<(usize, ProcessingResult)>
where
F: FnMut(&mut SyncingChain<T>) -> Option<ProcessingResult>,
{
ChainCollection::request_function(
self.finalized_chains
.iter_mut()
.chain(self.head_chains.iter_mut()),
func,
)
}
pub fn remove_finalized_chain(&mut self, index: usize) -> SyncingChain<T> {
self.finalized_chains.swap_remove(index)
}
pub fn remove_head_chain(&mut self, index: usize) -> SyncingChain<T> {
self.head_chains.swap_remove(index)
}
/// Removes a chain from either finalized or head chains based on the index. Using a request
/// iterates of finalized chains before head chains. Thus an index that is greater than the
/// finalized chain length, indicates a head chain.
pub fn remove_chain(&mut self, index: usize) -> SyncingChain<T> {
if index >= self.finalized_chains.len() {
let index = index - self.finalized_chains.len();
self.head_chains.swap_remove(index)
} else {
self.finalized_chains.swap_remove(index)
}
}
}

View File

@ -0,0 +1,8 @@
//! This provides the logic for syncing a chain when the local node is far behind it's current
//! peers.
mod chain;
mod chain_collection;
mod range;
pub use range::RangeSync;

View File

@ -0,0 +1,314 @@
use super::chain::ProcessingResult;
use super::chain_collection::{ChainCollection, SyncState};
use crate::sync::message_processor::PeerSyncInfo;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::rpc::RequestId;
use eth2_libp2p::PeerId;
use slog::{debug, trace, warn};
use std::collections::HashSet;
use std::sync::Weak;
use types::{BeaconBlock, EthSpec};
//TODO: The code becomes cleaner if finalized_chains and head_chains were merged into a single
// object. This will prevent code duplication. Rather than keeping the current syncing
// finalized chain in index 0, it should be stored in this object under an option. Then lookups can
// occur over the single object containing both finalized and head chains, which would then
// behave similarly.
pub struct RangeSync<T: BeaconChainTypes> {
/// The beacon chain for processing
beacon_chain: Weak<BeaconChain<T>>,
chains: ChainCollection<T>,
/// Known peers to the RangeSync, that need to be re-status'd once finalized chains are
/// completed.
awaiting_head_peers: HashSet<PeerId>,
log: slog::Logger,
}
impl<T: BeaconChainTypes> RangeSync<T> {
pub fn new(beacon_chain: Weak<BeaconChain<T>>, log: slog::Logger) -> Self {
RangeSync {
beacon_chain,
chains: ChainCollection::new(),
awaiting_head_peers: HashSet::new(),
log,
}
}
// Notify the collection that a fully synced peer was found. This allows updating the state
// if we were awaiting a head state.
pub fn fully_synced_peer_found(&mut self) {
self.chains.fully_synced_peer_found()
}
pub fn add_peer(
&mut self,
network: &mut SyncNetworkContext,
peer_id: PeerId,
remote: PeerSyncInfo,
) {
// evaluate which chain to sync from
// determine if we need to run a sync to the nearest finalized state or simply sync to
// its current head
let local_info = match self.beacon_chain.upgrade() {
Some(chain) => PeerSyncInfo::from(&chain),
None => {
warn!(self.log,
"Beacon chain dropped. Peer not considered for sync";
"peer_id" => format!("{:?}", peer_id));
return;
}
};
// convenience variables
let remote_finalized_slot = remote
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
let local_finalized_slot = local_info
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
// firstly, remove any out-of-date chains
self.chains.purge_finalized(local_finalized_slot);
self.chains.purge_head(local_info.head_slot);
// remove peer from any chains
self.remove_peer(network, &peer_id);
if remote_finalized_slot > local_info.head_slot {
debug!(self.log, "Finalization sync peer joined"; "peer_id" => format!("{:?}", peer_id));
// Finalized chain search
// Note: We keep current head chains. These can continue syncing whilst we complete
// this new finalized chain.
// If a finalized chain already exists that matches, add this peer to the chain's peer
// pool.
if let Some(chain) = self
.chains
.get_finalized_mut(remote.finalized_root, remote_finalized_slot)
{
debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => format!("{:?}", peer_id), "target_root" => format!("{}", chain.target_head_root), "end_slot" => chain.target_head_slot, "start_slot"=> chain.start_slot);
// add the peer to the chain's peer pool
chain.peer_pool.insert(peer_id.clone());
chain.peer_added(network, peer_id, &self.log);
// check if the new peer's addition will favour a new syncing chain.
self.chains
.update_finalized(self.beacon_chain.clone(), network, &self.log);
} else {
// there is no finalized chain that matches this peer's last finalized target
// create a new finalized chain
debug!(self.log, "New finalized chain added to sync"; "peer_id" => format!("{:?}", peer_id), "start_slot" => local_finalized_slot.as_u64(), "end_slot" => remote_finalized_slot.as_u64(), "finalized_root" => format!("{}", remote.finalized_root));
self.chains.new_finalized_chain(
local_finalized_slot,
remote.finalized_root,
remote_finalized_slot,
peer_id,
);
self.chains
.update_finalized(self.beacon_chain.clone(), network, &self.log);
}
} else {
if self.chains.is_finalizing_sync() {
// If there are finalized chains to sync, finish these first, before syncing head
// chains. This allows us to re-sync all known peers
trace!(self.log, "Waiting for finalized sync to complete"; "peer_id" => format!("{:?}", peer_id));
return;
}
// The new peer has the same finalized (earlier filters should prevent a peer with an
// earlier finalized chain from reaching here).
debug!(self.log, "New peer added for recent head sync"; "peer_id" => format!("{:?}", peer_id));
// search if there is a matching head chain, then add the peer to the chain
if let Some(chain) = self.chains.get_head_mut(remote.head_root, remote.head_slot) {
debug!(self.log, "Adding peer to the existing head chain peer pool"; "head_root" => format!("{}",remote.head_root), "head_slot" => remote.head_slot, "peer_id" => format!("{:?}", peer_id));
// add the peer to the head's pool
chain.peer_pool.insert(peer_id.clone());
chain.peer_added(network, peer_id.clone(), &self.log);
} else {
// There are no other head chains that match this peer's status, create a new one, and
let start_slot = std::cmp::min(local_info.head_slot, remote_finalized_slot);
debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote.head_root), "start_slot" => start_slot, "head_slot" => remote.head_slot, "peer_id" => format!("{:?}", peer_id));
self.chains.new_head_chain(
network,
start_slot,
remote.head_root,
remote.head_slot,
peer_id,
&self.log,
);
}
self.chains
.update_finalized(self.beacon_chain.clone(), network, &self.log);
}
}
pub fn blocks_by_range_response(
&mut self,
network: &mut SyncNetworkContext,
peer_id: PeerId,
request_id: RequestId,
beacon_block: Option<BeaconBlock<T::EthSpec>>,
) {
// Find the request. Most likely the first finalized chain (the syncing chain). If there
// are no finalized chains, then it will be a head chain. At most, there should only be
// `connected_peers` number of head chains, which should be relatively small and this
// lookup should not be very expensive. However, we could add an extra index that maps the
// request id to index of the vector to avoid O(N) searches and O(N) hash lookups.
// Note to future sync-rewriter/profiler: Michael approves of these O(N) searches.
let chain_ref = &self.beacon_chain;
let log_ref = &self.log;
match self.chains.finalized_request(|chain| {
chain.on_block_response(chain_ref, network, request_id, &beacon_block, log_ref)
}) {
Some((_, ProcessingResult::KeepChain)) => {} // blocks added to the chain
Some((index, ProcessingResult::RemoveChain)) => {
let chain = self.chains.remove_finalized_chain(index);
debug!(self.log, "Finalized chain removed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64());
// the chain is complete, re-status it's peers
chain.status_peers(self.beacon_chain.clone(), network);
// update the state of the collection
self.chains
.update_finalized(self.beacon_chain.clone(), network, &self.log);
// set the state to a head sync, to inform the manager that we are awaiting a
// head chain.
self.chains.set_head_sync();
// if there are no more finalized chains, re-status all known peers awaiting a head
// sync
match self.chains.sync_state() {
SyncState::Idle | SyncState::Head => {
for peer_id in self.awaiting_head_peers.iter() {
network.status_peer(self.beacon_chain.clone(), peer_id.clone());
}
}
SyncState::Finalized => {} // Have more finalized chains to complete
}
}
None => {
// The request was not in any finalized chain, search head chains
match self.chains.head_request(|chain| {
chain.on_block_response(&chain_ref, network, request_id, &beacon_block, log_ref)
}) {
Some((index, ProcessingResult::RemoveChain)) => {
let chain = self.chains.remove_head_chain(index);
debug!(self.log, "Head chain completed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64());
// the chain is complete, re-status it's peers and remove it
chain.status_peers(self.beacon_chain.clone(), network);
// update the state of the collection
self.chains
.update_finalized(self.beacon_chain.clone(), network, &self.log);
}
Some(_) => {}
None => {
// The request didn't exist in any `SyncingChain`. Could have been an old request. Log
// and ignore
debug!(self.log, "Range response without matching request"; "peer" => format!("{:?}", peer_id), "request_id" => request_id);
}
}
}
}
}
pub fn is_syncing(&self) -> bool {
match self.chains.sync_state() {
SyncState::Finalized => true,
SyncState::Head => true,
SyncState::Idle => false,
}
}
pub fn peer_disconnect(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) {
// if the peer is in the awaiting head mapping, remove it
self.awaiting_head_peers.remove(&peer_id);
// remove the peer from any peer pool
self.remove_peer(network, peer_id);
// update the state of the collection
self.chains
.update_finalized(self.beacon_chain.clone(), network, &self.log);
}
/// When a peer gets removed, both the head and finalized chains need to be searched to check which pool the peer is in. The chain may also have a batch or batches awaiting
/// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum
/// retries. In this case, we need to remove the chain and re-status all the peers.
fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) {
match self.chains.head_finalized_request(|chain| {
if chain.peer_pool.remove(&peer_id) {
// this chain contained the peer
let pending_batches_requests = chain
.pending_batches
.iter()
.filter(|(_, batch)| batch.current_peer == *peer_id)
.map(|(id, _)| id)
.cloned()
.collect::<Vec<_>>();
for request_id in pending_batches_requests {
if let Some(batch) = chain.pending_batches.remove(&request_id) {
if let ProcessingResult::RemoveChain = chain.failed_batch(network, batch) {
// a single batch failed, remove the chain
return Some(ProcessingResult::RemoveChain);
}
}
}
// peer removed from chain, no batch failed
Some(ProcessingResult::KeepChain)
} else {
None
}
}) {
Some((index, ProcessingResult::RemoveChain)) => {
// the chain needed to be removed
let chain = self.chains.remove_chain(index);
debug!(self.log, "Chain was removed due batch failing"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64());
// the chain has been removed, re-status it's peers
chain.status_peers(self.beacon_chain.clone(), network);
// update the state of the collection
self.chains
.update_finalized(self.beacon_chain.clone(), network, &self.log);
}
_ => {} // chain didn't need to be removed, ignore
}
// remove any chains that no longer have any peers
}
// An RPC Error occurred, if it's a pending batch, re-request it if possible, if there have
// been too many attempts, remove the chain
pub fn inject_error(
&mut self,
network: &mut SyncNetworkContext,
peer_id: PeerId,
request_id: RequestId,
) {
// check that this request is pending
let log_ref = &self.log;
match self.chains.head_finalized_request(|chain| {
chain.inject_error(network, &peer_id, &request_id, log_ref)
}) {
Some((_, ProcessingResult::KeepChain)) => {} // error handled chain persists
Some((index, ProcessingResult::RemoveChain)) => {
let chain = self.chains.remove_chain(index);
debug!(self.log, "Chain was removed due to error"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64());
// the chain has failed, re-status it's peers
chain.status_peers(self.beacon_chain.clone(), network);
// update the state of the collection
self.chains
.update_finalized(self.beacon_chain.clone(), network, &self.log);
}
None => {} // request wasn't in the finalized chains, check the head chains
}
}
}

View File

@ -251,7 +251,39 @@ pub fn publish_beacon_block<T: BeaconChainTypes>(
"block_slot" => slot,
);
publish_beacon_block_to_network::<T>(network_chan, block)
publish_beacon_block_to_network::<T>(network_chan, block)?;
// Run the fork choice algorithm and enshrine a new canonical head, if
// found.
//
// The new head may or may not be the block we just received.
if let Err(e) = beacon_chain.fork_choice() {
error!(
log,
"Failed to find beacon chain head";
"error" => format!("{:?}", e)
);
} else {
// In the best case, validators should produce blocks that become the
// head.
//
// Potential reasons this may not be the case:
//
// - A quick re-org between block produce and publish.
// - Excessive time between block produce and publish.
// - A validator is using another beacon node to produce blocks and
// submitting them here.
if beacon_chain.head().beacon_block_root != block_root {
warn!(
log,
"Block from validator is not head";
"desc" => "potential re-org",
);
}
}
Ok(())
}
Ok(outcome) => {
warn!(
@ -279,7 +311,7 @@ pub fn publish_beacon_block<T: BeaconChainTypes>(
}
}
})
.and_then(|_| response_builder?.body_no_ssz(&())),
.and_then(|_| response_builder?.body_no_ssz(&()))
)
}

View File

@ -22,7 +22,7 @@ type Config = (ClientConfig, Eth2Config, Logger);
/// Gets the fully-initialized global client and eth2 configuration objects.
///
/// The top-level `clap` arguments should be provied as `cli_args`.
/// The top-level `clap` arguments should be provided as `cli_args`.
///
/// The output of this function depends primarily upon the given `cli_args`, however it's behaviour
/// may be influenced by other external services like the contents of the file system or the

View File

@ -3,7 +3,7 @@
//! https://github.com/eth2-clients/eth2-testnets/tree/master/nimbus/testnet1
//!
//! It is not accurate at the moment, we include extra files and we also don't support a few
//! others. We are unable to confirm to the repo until we have the following PR merged:
//! others. We are unable to conform to the repo until we have the following PR merged:
//!
//! https://github.com/sigp/lighthouse/pull/605