Updates syncing stability, fixes large RPC message codec, corrects beacon chain referencing

This commit is contained in:
Age Manning 2019-09-05 08:07:57 +10:00
parent 8256621230
commit a3877b6135
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
7 changed files with 312 additions and 261 deletions

View File

@ -34,10 +34,10 @@ pub fn run<T: BeaconChainTypes>(client: &Client<T>, executor: TaskExecutor, exit
// Panics if libp2p is poisoned. // Panics if libp2p is poisoned.
let connected_peer_count = libp2p.lock().swarm.connected_peers(); let connected_peer_count = libp2p.lock().swarm.connected_peers();
debug!(log, "Libp2p connected peer status"; "peer_count" => connected_peer_count); debug!(log, "Connected peer status"; "peer_count" => connected_peer_count);
if connected_peer_count <= WARN_PEER_COUNT { if connected_peer_count <= WARN_PEER_COUNT {
warn!(log, "Low libp2p peer count"; "peer_count" => connected_peer_count); warn!(log, "Low peer count"; "peer_count" => connected_peer_count);
} }
Ok(()) Ok(())

View File

@ -152,45 +152,49 @@ impl Decoder for SSZOutboundCodec {
type Error = RPCError; type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.inner.decode(src).map_err(RPCError::from) { if src.is_empty() {
Ok(Some(packet)) => match self.protocol.message_name.as_str() { // the object sent could be empty. We return the empty object if this is the case
match self.protocol.message_name.as_str() {
"hello" => match self.protocol.version.as_str() { "hello" => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::Hello(HelloMessage::from_ssz_bytes( "1" => Err(RPCError::Custom(
&packet, "Hello stream terminated unexpectedly".into(),
)?))), )), // cannot have an empty HELLO message. The stream has terminated unexpectedly
_ => unreachable!("Cannot negotiate an unknown version"), _ => unreachable!("Cannot negotiate an unknown version"),
}, },
"goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")), "goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")),
"beacon_blocks" => match self.protocol.version.as_str() { "beacon_blocks" => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::BeaconBlocks(packet.to_vec()))), "1" => Ok(Some(RPCResponse::BeaconBlocks(Vec::new()))),
_ => unreachable!("Cannot negotiate an unknown version"), _ => unreachable!("Cannot negotiate an unknown version"),
}, },
"recent_beacon_blocks" => match self.protocol.version.as_str() { "recent_beacon_blocks" => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::RecentBeaconBlocks(packet.to_vec()))), "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(Vec::new()))),
_ => unreachable!("Cannot negotiate an unknown version"), _ => unreachable!("Cannot negotiate an unknown version"),
}, },
_ => unreachable!("Cannot negotiate an unknown protocol"), _ => unreachable!("Cannot negotiate an unknown protocol"),
}, }
Ok(None) => { } else {
// the object sent could be a empty. We return the empty object if this is the case match self.inner.decode(src).map_err(RPCError::from) {
match self.protocol.message_name.as_str() { Ok(Some(packet)) => match self.protocol.message_name.as_str() {
"hello" => match self.protocol.version.as_str() { "hello" => match self.protocol.version.as_str() {
"1" => Ok(None), // cannot have an empty HELLO message. The stream has terminated unexpectedly "1" => Ok(Some(RPCResponse::Hello(HelloMessage::from_ssz_bytes(
&packet,
)?))),
_ => unreachable!("Cannot negotiate an unknown version"), _ => unreachable!("Cannot negotiate an unknown version"),
}, },
"goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")), "goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")),
"beacon_blocks" => match self.protocol.version.as_str() { "beacon_blocks" => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::BeaconBlocks(Vec::new()))), "1" => Ok(Some(RPCResponse::BeaconBlocks(packet.to_vec()))),
_ => unreachable!("Cannot negotiate an unknown version"), _ => unreachable!("Cannot negotiate an unknown version"),
}, },
"recent_beacon_blocks" => match self.protocol.version.as_str() { "recent_beacon_blocks" => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::RecentBeaconBlocks(Vec::new()))), "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(packet.to_vec()))),
_ => unreachable!("Cannot negotiate an unknown version"), _ => unreachable!("Cannot negotiate an unknown version"),
}, },
_ => unreachable!("Cannot negotiate an unknown protocol"), _ => unreachable!("Cannot negotiate an unknown protocol"),
} },
Ok(None) => Ok(None), // waiting for more bytes
Err(e) => Err(e),
} }
Err(e) => Err(e),
} }
} }
} }

View File

@ -168,8 +168,10 @@ impl std::fmt::Display for RPCResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
RPCResponse::Hello(hello) => write!(f, "{}", hello), RPCResponse::Hello(hello) => write!(f, "{}", hello),
RPCResponse::BeaconBlocks(_) => write!(f, "<BeaconBlocks>"), RPCResponse::BeaconBlocks(data) => write!(f, "<BeaconBlocks>, len: {}", data.len()),
RPCResponse::RecentBeaconBlocks(_) => write!(f, "<RecentBeaconBlocks>"), RPCResponse::RecentBeaconBlocks(data) => {
write!(f, "<RecentBeaconBlocks>, len: {}", data.len())
}
} }
} }
} }

View File

@ -98,6 +98,11 @@ impl Service {
// attempt to connect to any specified boot-nodes // attempt to connect to any specified boot-nodes
for bootnode_enr in config.boot_nodes { for bootnode_enr in config.boot_nodes {
for multiaddr in bootnode_enr.multiaddr() { for multiaddr in bootnode_enr.multiaddr() {
// ignore udp multiaddr if it exists
let components = multiaddr.iter().collect::<Vec<_>>();
if let Protocol::Udp(_) = components[1] {
continue;
}
dial_addr(multiaddr); dial_addr(multiaddr);
} }
} }

View File

@ -17,8 +17,6 @@ use types::{Attestation, AttesterSlashing, BeaconBlock, ProposerSlashing, Volunt
/// Handles messages received from the network and client and organises syncing. /// Handles messages received from the network and client and organises syncing.
pub struct MessageHandler<T: BeaconChainTypes> { pub struct MessageHandler<T: BeaconChainTypes> {
/// Currently loaded and initialised beacon chain.
_chain: Arc<BeaconChain<T>>,
/// The syncing framework. /// The syncing framework.
sync: SimpleSync<T>, sync: SimpleSync<T>,
/// A channel to the network service to allow for gossip propagation. /// A channel to the network service to allow for gossip propagation.
@ -53,13 +51,12 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
let (handler_send, handler_recv) = mpsc::unbounded_channel(); let (handler_send, handler_recv) = mpsc::unbounded_channel();
// Initialise sync and begin processing in thread // Initialise sync and begin processing in thread
let sync = SimpleSync::new(beacon_chain.clone(), network_send.clone(), &log); let sync = SimpleSync::new(Arc::downgrade(&beacon_chain), network_send.clone(), &log);
// generate the Message handler // generate the Message handler
let mut handler = MessageHandler { let mut handler = MessageHandler {
_chain: beacon_chain.clone(),
sync,
network_send, network_send,
sync,
log: log.clone(), log: log.clone(),
}; };

View File

@ -62,13 +62,13 @@ use slog::{debug, info, trace, warn, Logger};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::ops::{Add, Sub}; use std::ops::{Add, Sub};
use std::sync::{Arc, Weak}; use std::sync::Weak;
use types::{BeaconBlock, EthSpec, Hash256, Slot}; use types::{BeaconBlock, EthSpec, Hash256, Slot};
/// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch /// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch
/// is requested. Currently the value is small for testing. This will be incremented for /// is requested. Currently the value is small for testing. This will be incremented for
/// production. /// production.
const MAX_BLOCKS_PER_REQUEST: u64 = 100; const MAX_BLOCKS_PER_REQUEST: u64 = 50;
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a
@ -224,10 +224,10 @@ impl<T: BeaconChainTypes> ImportManager<T> {
/// Generates a new `ImportManager` given a logger and an Arc reference to a beacon chain. The /// Generates a new `ImportManager` given a logger and an Arc reference to a beacon chain. The
/// import manager keeps a weak reference to the beacon chain, which allows the chain to be /// import manager keeps a weak reference to the beacon chain, which allows the chain to be
/// dropped during the syncing process. The syncing handles this termination gracefully. /// dropped during the syncing process. The syncing handles this termination gracefully.
pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: &slog::Logger) -> Self { pub fn new(beacon_chain: Weak<BeaconChain<T>>, log: &slog::Logger) -> Self {
ImportManager { ImportManager {
event_queue: SmallVec::new(), event_queue: SmallVec::new(),
chain: Arc::downgrade(&beacon_chain), chain: beacon_chain,
state: ManagerState::Regular, state: ManagerState::Regular,
import_queue: HashMap::new(), import_queue: HashMap::new(),
parent_queue: SmallVec::new(), parent_queue: SmallVec::new(),
@ -359,7 +359,9 @@ impl<T: BeaconChainTypes> ImportManager<T> {
warn!(self.log, "Peer returned too many empty block batches"; warn!(self.log, "Peer returned too many empty block batches";
"peer" => format!("{:?}", peer_id)); "peer" => format!("{:?}", peer_id));
block_requests.state = BlockRequestsState::Failed; block_requests.state = BlockRequestsState::Failed;
} else if block_requests.current_start_slot >= block_requests.target_head_slot { } else if block_requests.current_start_slot + MAX_BLOCKS_PER_REQUEST
>= block_requests.target_head_slot
{
warn!(self.log, "Peer did not return blocks it claimed to possess"; warn!(self.log, "Peer did not return blocks it claimed to possess";
"peer" => format!("{:?}", peer_id)); "peer" => format!("{:?}", peer_id));
block_requests.state = BlockRequestsState::Failed; block_requests.state = BlockRequestsState::Failed;
@ -583,6 +585,11 @@ impl<T: BeaconChainTypes> ImportManager<T> {
re_run = re_run || self.process_complete_parent_requests(); re_run = re_run || self.process_complete_parent_requests();
} }
// exit early if the beacon chain is dropped
if let None = self.chain.upgrade() {
return ImportManagerOutcome::Idle;
}
// return any queued events // return any queued events
if !self.event_queue.is_empty() { if !self.event_queue.is_empty() {
let event = self.event_queue.remove(0); let event = self.event_queue.remove(0);
@ -681,56 +688,48 @@ impl<T: BeaconChainTypes> ImportManager<T> {
self.import_queue.retain(|peer_id, block_requests| { self.import_queue.retain(|peer_id, block_requests| {
if block_requests.state == BlockRequestsState::ReadyToProcess { if block_requests.state == BlockRequestsState::ReadyToProcess {
// check that the chain still exists let downloaded_blocks =
if let Some(chain) = chain_ref.upgrade() { std::mem::replace(&mut block_requests.downloaded_blocks, Vec::new());
let downloaded_blocks = let last_element = downloaded_blocks.len() - 1;
std::mem::replace(&mut block_requests.downloaded_blocks, Vec::new()); let start_slot = downloaded_blocks[0].slot;
let last_element = downloaded_blocks.len() - 1; let end_slot = downloaded_blocks[last_element].slot;
let start_slot = downloaded_blocks[0].slot;
let end_slot = downloaded_blocks[last_element].slot;
match process_blocks(chain, downloaded_blocks, log_ref) { match process_blocks(chain_ref.clone(), downloaded_blocks, log_ref) {
Ok(()) => { Ok(()) => {
debug!(log_ref, "Blocks processed successfully"; debug!(log_ref, "Blocks processed successfully";
"peer" => format!("{:?}", peer_id),
"start_slot" => start_slot,
"end_slot" => end_slot,
"no_blocks" => last_element + 1,
);
block_requests.blocks_processed += last_element + 1;
// check if the batch is complete, by verifying if we have reached the
// target head
if end_slot >= block_requests.target_head_slot {
// Completed, re-hello the peer to ensure we are up to the latest head
event_queue_ref.push(ImportManagerOutcome::Hello(peer_id.clone()));
// remove the request
false
} else {
// have not reached the end, queue another batch
block_requests.update_start_slot();
re_run = true;
// keep the batch
true
}
}
Err(e) => {
warn!(log_ref, "Block processing failed";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"start_slot" => start_slot, "start_slot" => start_slot,
"end_slot" => end_slot, "end_slot" => end_slot,
"no_blocks" => last_element + 1, "no_blocks" => last_element + 1,
); "error" => format!("{:?}", e),
block_requests.blocks_processed += last_element + 1; );
event_queue_ref.push(ImportManagerOutcome::DownvotePeer(peer_id.clone()));
// check if the batch is complete, by verifying if we have reached the false
// target head
if end_slot >= block_requests.target_head_slot {
// Completed, re-hello the peer to ensure we are up to the latest head
event_queue_ref.push(ImportManagerOutcome::Hello(peer_id.clone()));
// remove the request
false
} else {
// have not reached the end, queue another batch
block_requests.update_start_slot();
re_run = true;
// keep the batch
true
}
}
Err(e) => {
warn!(log_ref, "Block processing failed";
"peer" => format!("{:?}", peer_id),
"start_slot" => start_slot,
"end_slot" => end_slot,
"no_blocks" => last_element + 1,
"error" => format!("{:?}", e),
);
event_queue_ref
.push(ImportManagerOutcome::DownvotePeer(peer_id.clone()));
false
}
} }
} else {
// chain no longer exists, empty the queue and return
event_queue_ref.clear();
return false;
} }
} else { } else {
// not ready to process // not ready to process
@ -894,42 +893,43 @@ impl<T: BeaconChainTypes> ImportManager<T> {
// Helper function to process blocks // Helper function to process blocks
fn process_blocks<T: BeaconChainTypes>( fn process_blocks<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>, weak_chain: Weak<BeaconChain<T>>,
blocks: Vec<BeaconBlock<T::EthSpec>>, blocks: Vec<BeaconBlock<T::EthSpec>>,
log: &Logger, log: &Logger,
) -> Result<(), String> { ) -> Result<(), String> {
for block in blocks { for block in blocks {
let processing_result = chain.process_block(block.clone()); if let Some(chain) = weak_chain.upgrade() {
let processing_result = chain.process_block(block.clone());
if let Ok(outcome) = processing_result { if let Ok(outcome) = processing_result {
match outcome { match outcome {
BlockProcessingOutcome::Processed { block_root } => { BlockProcessingOutcome::Processed { block_root } => {
// The block was valid and we processed it successfully. // The block was valid and we processed it successfully.
trace!( trace!(
log, "Imported block from network"; log, "Imported block from network";
"slot" => block.slot, "slot" => block.slot,
"block_root" => format!("{}", block_root), "block_root" => format!("{}", block_root),
); );
} }
BlockProcessingOutcome::ParentUnknown { parent } => { BlockProcessingOutcome::ParentUnknown { parent } => {
// blocks should be sequential and all parents should exist // blocks should be sequential and all parents should exist
trace!( trace!(
log, "Parent block is unknown"; log, "Parent block is unknown";
"parent_root" => format!("{}", parent), "parent_root" => format!("{}", parent),
"baby_block_slot" => block.slot, "baby_block_slot" => block.slot,
); );
return Err(format!( return Err(format!(
"Block at slot {} has an unknown parent.", "Block at slot {} has an unknown parent.",
block.slot block.slot
)); ));
} }
BlockProcessingOutcome::BlockIsAlreadyKnown => { BlockProcessingOutcome::BlockIsAlreadyKnown => {
// this block is already known to us, move to the next // this block is already known to us, move to the next
debug!( debug!(
log, "Imported a block that is already known"; log, "Imported a block that is already known";
"parent_root" => format!("{}", parent), "block_slot" => block.slot,
"baby_block_slot" => block.slot, );
); }
BlockProcessingOutcome::FutureSlot { BlockProcessingOutcome::FutureSlot {
present_slot, present_slot,
block_slot, block_slot,
@ -937,7 +937,7 @@ fn process_blocks<T: BeaconChainTypes>(
if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot {
// The block is too far in the future, drop it. // The block is too far in the future, drop it.
trace!( trace!(
self.log, "Block is ahead of our slot clock"; log, "Block is ahead of our slot clock";
"msg" => "block for future slot rejected, check your time", "msg" => "block for future slot rejected, check your time",
"present_slot" => present_slot, "present_slot" => present_slot,
"block_slot" => block_slot, "block_slot" => block_slot,
@ -950,7 +950,7 @@ fn process_blocks<T: BeaconChainTypes>(
} else { } else {
// The block is in the future, but not too far. // The block is in the future, but not too far.
trace!( trace!(
self.log, "Block is slightly ahead of our slot clock, ignoring."; log, "Block is slightly ahead of our slot clock, ignoring.";
"present_slot" => present_slot, "present_slot" => present_slot,
"block_slot" => block_slot, "block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
@ -959,44 +959,41 @@ fn process_blocks<T: BeaconChainTypes>(
} }
BlockProcessingOutcome::WouldRevertFinalizedSlot { .. } => { BlockProcessingOutcome::WouldRevertFinalizedSlot { .. } => {
trace!( trace!(
self.log, "Finalized or earlier block processed"; log, "Finalized or earlier block processed";
"outcome" => format!("{:?}", outcome), "outcome" => format!("{:?}", outcome),
); );
// block reached our finalized slot or was earlier, move to the next block // block reached our finalized slot or was earlier, move to the next block
} }
BlockProcessingOutcome::GenesisBlock => { BlockProcessingOutcome::GenesisBlock => {
trace!( trace!(
self.log, "Genesis block was processed"; log, "Genesis block was processed";
"outcome" => format!("{:?}", outcome), "outcome" => format!("{:?}", outcome),
); );
} }
BlockProcessingOutcome::FinalizedSlot => { _ => {
trace!( warn!(
log, "Finalized or earlier block processed"; log, "Invalid block received";
"outcome" => format!("{:?}", outcome), "msg" => "peer sent invalid block",
); "outcome" => format!("{:?}", outcome),
// block reached our finalized slot or was earlier, move to the next block );
} return Err(format!("Invalid block at slot {}", block.slot));
_ => { }
warn!(
log, "Invalid block received";
"msg" => "peer sent invalid block",
"outcome" => format!("{:?}", outcome),
);
return Err(format!("Invalid block at slot {}", block.slot));
} }
} else {
warn!(
log, "BlockProcessingFailure";
"msg" => "unexpected condition in processing block.",
"outcome" => format!("{:?}", processing_result)
);
return Err(format!(
"Unexpected block processing error: {:?}",
processing_result
));
} }
} else { } else {
warn!( return Ok(()); // terminate early due to dropped beacon chain
log, "BlockProcessingFailure";
"msg" => "unexpected condition in processing block.",
"outcome" => format!("{:?}", processing_result)
);
return Err(format!(
"Unexpected block processing error: {:?}",
processing_result
));
} }
} }
Ok(()) Ok(())
} }

View File

@ -6,7 +6,7 @@ use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::PeerId; use eth2_libp2p::PeerId;
use slog::{debug, info, o, trace, warn}; use slog::{debug, info, o, trace, warn};
use ssz::Encode; use ssz::Encode;
use std::sync::Arc; use std::sync::{Arc, Weak};
use store::Store; use store::Store;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::{Attestation, BeaconBlock, Epoch, EthSpec, Hash256, Slot}; use types::{Attestation, BeaconBlock, Epoch, EthSpec, Hash256, Slot};
@ -57,7 +57,7 @@ pub enum SyncState {
/// Simple Syncing protocol. /// Simple Syncing protocol.
pub struct SimpleSync<T: BeaconChainTypes> { pub struct SimpleSync<T: BeaconChainTypes> {
/// A reference to the underlying beacon chain. /// A reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>, chain: Weak<BeaconChain<T>>,
manager: ImportManager<T>, manager: ImportManager<T>,
network: NetworkContext, network: NetworkContext,
log: slog::Logger, log: slog::Logger,
@ -66,7 +66,7 @@ pub struct SimpleSync<T: BeaconChainTypes> {
impl<T: BeaconChainTypes> SimpleSync<T> { impl<T: BeaconChainTypes> SimpleSync<T> {
/// Instantiate a `SimpleSync` instance, with no peers and an empty queue. /// Instantiate a `SimpleSync` instance, with no peers and an empty queue.
pub fn new( pub fn new(
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Weak<BeaconChain<T>>,
network_send: mpsc::UnboundedSender<NetworkMessage>, network_send: mpsc::UnboundedSender<NetworkMessage>,
log: &slog::Logger, log: &slog::Logger,
) -> Self { ) -> Self {
@ -91,8 +91,10 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
/// ///
/// Sends a `Hello` message to the peer. /// Sends a `Hello` message to the peer.
pub fn on_connect(&mut self, peer_id: PeerId) { pub fn on_connect(&mut self, peer_id: PeerId) {
self.network if let Some(chain) = self.chain.upgrade() {
.send_rpc_request(None, peer_id, RPCRequest::Hello(hello_message(&self.chain))); self.network
.send_rpc_request(None, peer_id, RPCRequest::Hello(hello_message(&chain)));
}
} }
/// Handle a `Hello` request. /// Handle a `Hello` request.
@ -104,16 +106,19 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
request_id: RequestId, request_id: RequestId,
hello: HelloMessage, hello: HelloMessage,
) { ) {
trace!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); // ignore hello responses if we are shutting down
if let Some(chain) = self.chain.upgrade() {
trace!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id));
// Say hello back. // Say hello back.
self.network.send_rpc_response( self.network.send_rpc_response(
peer_id.clone(), peer_id.clone(),
request_id, request_id,
RPCResponse::Hello(hello_message(&self.chain)), RPCResponse::Hello(hello_message(&chain)),
); );
self.process_hello(peer_id, hello); self.process_hello(peer_id, hello);
}
} }
/// Process a `Hello` response from a peer. /// Process a `Hello` response from a peer.
@ -128,88 +133,107 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
/// ///
/// Disconnects the peer if required. /// Disconnects the peer if required.
fn process_hello(&mut self, peer_id: PeerId, hello: HelloMessage) { fn process_hello(&mut self, peer_id: PeerId, hello: HelloMessage) {
let remote = PeerSyncInfo::from(hello); // If we update the manager we may need to drive the sync. This flag lies out of scope of
let local = PeerSyncInfo::from(&self.chain); // the beacon chain so that the process sync command has no long-lived beacon chain
// references.
let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); let mut process_sync = false;
if local.fork_version != remote.fork_version {
// The node is on a different network/fork, disconnect them.
debug!(
self.log, "HandshakeFailure";
"peer" => format!("{:?}", peer_id),
"reason" => "network_id"
);
self.network
.disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork);
} else if remote.finalized_epoch <= local.finalized_epoch
&& remote.finalized_root != Hash256::zero()
&& local.finalized_root != Hash256::zero()
&& (self.root_at_slot(start_slot(remote.finalized_epoch))
!= Some(remote.finalized_root))
{ {
// The remotes finalized epoch is less than or greater than ours, but the block root is // scope of beacon chain reference
// different to the one in our chain. let chain = match self.chain.upgrade() {
// Some(chain) => chain,
// Therefore, the node is on a different chain and we should not communicate with them. None => {
debug!( info!(self.log, "Sync shutting down";
self.log, "HandshakeFailure"; "reason" => "Beacon chain dropped");
"peer" => format!("{:?}", peer_id), return;
"reason" => "different finalized chain" }
); };
self.network
.disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork);
} else if remote.finalized_epoch < local.finalized_epoch {
// The node has a lower finalized epoch, their chain is not useful to us. There are two
// cases where a node can have a lower finalized epoch:
//
// ## The node is on the same chain
//
// If a node is on the same chain but has a lower finalized epoch, their head must be
// lower than ours. Therefore, we have nothing to request from them.
//
// ## The node is on a fork
//
// If a node is on a fork that has a lower finalized epoch, switching to that fork would
// cause us to revert a finalized block. This is not permitted, therefore we have no
// interest in their blocks.
debug!(
self.log,
"NaivePeer";
"peer" => format!("{:?}", peer_id),
"reason" => "lower finalized epoch"
);
} else if self
.chain
.store
.exists::<BeaconBlock<T::EthSpec>>(&remote.head_root)
.unwrap_or_else(|_| false)
{
trace!(
self.log, "Out of date or potentially sync'd peer found";
"peer" => format!("{:?}", peer_id),
"remote_head_slot" => remote.head_slot,
"remote_latest_finalized_epoch" => remote.finalized_epoch,
);
// If the node's best-block is already known to us and they are close to our current let remote = PeerSyncInfo::from(hello);
// head, treat them as a fully sync'd peer. let local = PeerSyncInfo::from(&chain);
self.manager.add_peer(peer_id, remote);
self.process_sync();
} else {
// The remote node has an equal or great finalized epoch and we don't know it's head.
//
// Therefore, there are some blocks between the local finalized epoch and the remote
// head that are worth downloading.
debug!(
self.log, "UsefulPeer";
"peer" => format!("{:?}", peer_id),
"local_finalized_epoch" => local.finalized_epoch,
"remote_latest_finalized_epoch" => remote.finalized_epoch,
);
self.manager.add_peer(peer_id, remote); let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch());
if local.fork_version != remote.fork_version {
// The node is on a different network/fork, disconnect them.
debug!(
self.log, "HandshakeFailure";
"peer" => format!("{:?}", peer_id),
"reason" => "network_id"
);
self.network
.disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork);
} else if remote.finalized_epoch <= local.finalized_epoch
&& remote.finalized_root != Hash256::zero()
&& local.finalized_root != Hash256::zero()
&& (chain.root_at_slot(start_slot(remote.finalized_epoch))
!= Some(remote.finalized_root))
{
// The remotes finalized epoch is less than or greater than ours, but the block root is
// different to the one in our chain.
//
// Therefore, the node is on a different chain and we should not communicate with them.
debug!(
self.log, "HandshakeFailure";
"peer" => format!("{:?}", peer_id),
"reason" => "different finalized chain"
);
self.network
.disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork);
} else if remote.finalized_epoch < local.finalized_epoch {
// The node has a lower finalized epoch, their chain is not useful to us. There are two
// cases where a node can have a lower finalized epoch:
//
// ## The node is on the same chain
//
// If a node is on the same chain but has a lower finalized epoch, their head must be
// lower than ours. Therefore, we have nothing to request from them.
//
// ## The node is on a fork
//
// If a node is on a fork that has a lower finalized epoch, switching to that fork would
// cause us to revert a finalized block. This is not permitted, therefore we have no
// interest in their blocks.
debug!(
self.log,
"NaivePeer";
"peer" => format!("{:?}", peer_id),
"reason" => "lower finalized epoch"
);
} else if chain
.store
.exists::<BeaconBlock<T::EthSpec>>(&remote.head_root)
.unwrap_or_else(|_| false)
{
trace!(
self.log, "Peer with known chain found";
"peer" => format!("{:?}", peer_id),
"remote_head_slot" => remote.head_slot,
"remote_latest_finalized_epoch" => remote.finalized_epoch,
);
// If the node's best-block is already known to us and they are close to our current
// head, treat them as a fully sync'd peer.
self.manager.add_peer(peer_id, remote);
process_sync = true;
} else {
// The remote node has an equal or great finalized epoch and we don't know it's head.
//
// Therefore, there are some blocks between the local finalized epoch and the remote
// head that are worth downloading.
debug!(
self.log, "UsefulPeer";
"peer" => format!("{:?}", peer_id),
"local_finalized_epoch" => local.finalized_epoch,
"remote_latest_finalized_epoch" => remote.finalized_epoch,
);
self.manager.add_peer(peer_id, remote);
process_sync = true
}
} // end beacon chain reference scope
if process_sync {
self.process_sync(); self.process_sync();
} }
} }
@ -226,11 +250,13 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"method" => "HELLO", "method" => "HELLO",
"peer" => format!("{:?}", peer_id) "peer" => format!("{:?}", peer_id)
); );
self.network.send_rpc_request( if let Some(chain) = self.chain.upgrade() {
None, self.network.send_rpc_request(
peer_id, None,
RPCRequest::Hello(hello_message(&self.chain)), peer_id,
); RPCRequest::Hello(hello_message(&chain)),
);
}
} }
ImportManagerOutcome::RequestBlocks { ImportManagerOutcome::RequestBlocks {
peer_id, peer_id,
@ -283,14 +309,6 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
} }
} }
//TODO: Move to beacon chain
fn root_at_slot(&self, target_slot: Slot) -> Option<Hash256> {
self.chain
.rev_iter_block_roots()
.find(|(_root, slot)| *slot == target_slot)
.map(|(root, _slot)| root)
}
/// Handle a `RecentBeaconBlocks` request from the peer. /// Handle a `RecentBeaconBlocks` request from the peer.
pub fn on_recent_beacon_blocks_request( pub fn on_recent_beacon_blocks_request(
&mut self, &mut self,
@ -298,11 +316,20 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
request_id: RequestId, request_id: RequestId,
request: RecentBeaconBlocksRequest, request: RecentBeaconBlocksRequest,
) { ) {
let chain = match self.chain.upgrade() {
Some(chain) => chain,
None => {
info!(self.log, "Sync shutting down";
"reason" => "Beacon chain dropped");
return;
}
};
let blocks: Vec<BeaconBlock<_>> = request let blocks: Vec<BeaconBlock<_>> = request
.block_roots .block_roots
.iter() .iter()
.filter_map(|root| { .filter_map(|root| {
if let Ok(Some(block)) = self.chain.store.get::<BeaconBlock<T::EthSpec>>(root) { if let Ok(Some(block)) = chain.store.get::<BeaconBlock<T::EthSpec>>(root) {
Some(block) Some(block)
} else { } else {
debug!( debug!(
@ -319,7 +346,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
debug!( debug!(
self.log, self.log,
"BlockBodiesRequest"; "RecentBeaconBlocksRequest";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"requested" => request.block_roots.len(), "requested" => request.block_roots.len(),
"returned" => blocks.len(), "returned" => blocks.len(),
@ -339,6 +366,15 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
request_id: RequestId, request_id: RequestId,
req: BeaconBlocksRequest, req: BeaconBlocksRequest,
) { ) {
let chain = match self.chain.upgrade() {
Some(chain) => chain,
None => {
info!(self.log, "Sync shutting down";
"reason" => "Beacon chain dropped");
return;
}
};
debug!( debug!(
self.log, self.log,
"BeaconBlocksRequest"; "BeaconBlocksRequest";
@ -352,15 +388,14 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
// In the current implementation we read from the db then filter out out-of-range blocks. // In the current implementation we read from the db then filter out out-of-range blocks.
// Improving the db schema to prevent this would be ideal. // Improving the db schema to prevent this would be ideal.
let mut blocks: Vec<BeaconBlock<T::EthSpec>> = self let mut blocks: Vec<BeaconBlock<T::EthSpec>> = chain
.chain
.rev_iter_block_roots() .rev_iter_block_roots()
.filter(|(_root, slot)| { .filter(|(_root, slot)| {
req.start_slot <= slot.as_u64() && req.start_slot + req.count > slot.as_u64() req.start_slot <= slot.as_u64() && req.start_slot + req.count > slot.as_u64()
}) })
.take_while(|(_root, slot)| req.start_slot <= slot.as_u64()) .take_while(|(_root, slot)| req.start_slot <= slot.as_u64())
.filter_map(|(root, _slot)| { .filter_map(|(root, _slot)| {
if let Ok(Some(block)) = self.chain.store.get::<BeaconBlock<T::EthSpec>>(&root) { if let Ok(Some(block)) = chain.store.get::<BeaconBlock<T::EthSpec>>(&root) {
Some(block) Some(block)
} else { } else {
warn!( warn!(
@ -378,18 +413,16 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
blocks.reverse(); blocks.reverse();
blocks.dedup_by_key(|brs| brs.slot); blocks.dedup_by_key(|brs| brs.slot);
if blocks.len() as u64 != req.count { debug!(
debug!( self.log,
self.log, "BeaconBlocksRequest response";
"BeaconBlocksRequest response"; "peer" => format!("{:?}", peer_id),
"peer" => format!("{:?}", peer_id), "msg" => "Failed to return all requested hashes",
"msg" => "Failed to return all requested hashes", "start_slot" => req.start_slot,
"start_slot" => req.start_slot, "current_slot" => chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(),
"current_slot" => format!("{:?}", self.chain.slot()), "requested" => req.count,
"requested" => req.count, "returned" => blocks.len(),
"returned" => blocks.len(), );
);
}
self.network.send_rpc_response( self.network.send_rpc_response(
peer_id, peer_id,
@ -444,7 +477,16 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
/// ///
/// Returns a `bool` which, if `true`, indicates we should forward the block to our peers. /// Returns a `bool` which, if `true`, indicates we should forward the block to our peers.
pub fn on_block_gossip(&mut self, peer_id: PeerId, block: BeaconBlock<T::EthSpec>) -> bool { pub fn on_block_gossip(&mut self, peer_id: PeerId, block: BeaconBlock<T::EthSpec>) -> bool {
if let Ok(outcome) = self.chain.process_block(block.clone()) { let chain = match self.chain.upgrade() {
Some(chain) => chain,
None => {
info!(self.log, "Sync shutting down";
"reason" => "Beacon chain dropped");
return false;
}
};
if let Ok(outcome) = chain.process_block(block.clone()) {
match outcome { match outcome {
BlockProcessingOutcome::Processed { .. } => { BlockProcessingOutcome::Processed { .. } => {
trace!(self.log, "Gossipsub block processed"; trace!(self.log, "Gossipsub block processed";
@ -477,7 +519,16 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
/// ///
/// Not currently implemented. /// Not currently implemented.
pub fn on_attestation_gossip(&mut self, _peer_id: PeerId, msg: Attestation<T::EthSpec>) { pub fn on_attestation_gossip(&mut self, _peer_id: PeerId, msg: Attestation<T::EthSpec>) {
match self.chain.process_attestation(msg) { let chain = match self.chain.upgrade() {
Some(chain) => chain,
None => {
info!(self.log, "Sync shutting down";
"reason" => "Beacon chain dropped");
return;
}
};
match chain.process_attestation(msg) {
Ok(outcome) => info!( Ok(outcome) => info!(
self.log, self.log,
"Processed attestation"; "Processed attestation";
@ -489,11 +540,6 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
} }
} }
} }
/// Generates our current state in the form of a HELLO RPC message.
pub fn generate_hello(&self) -> HelloMessage {
hello_message(&self.chain)
}
} }
/// Build a `HelloMessage` representing the state of the given `beacon_chain`. /// Build a `HelloMessage` representing the state of the given `beacon_chain`.