Improved syncing compilation issues

This commit is contained in:
Age Manning 2019-08-24 01:09:29 +10:00
parent c259d6c006
commit b078385362
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
4 changed files with 622 additions and 530 deletions

View File

@ -22,8 +22,6 @@ pub struct MessageHandler<T: BeaconChainTypes> {
_chain: Arc<BeaconChain<T>>, _chain: Arc<BeaconChain<T>>,
/// The syncing framework. /// The syncing framework.
sync: SimpleSync<T>, sync: SimpleSync<T>,
/// The context required to send messages to, and process messages from peers.
network_context: NetworkContext,
/// The `MessageHandler` logger. /// The `MessageHandler` logger.
log: slog::Logger, log: slog::Logger,
} }
@ -52,15 +50,13 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
trace!(log, "Service starting"); trace!(log, "Service starting");
let (handler_send, handler_recv) = mpsc::unbounded_channel(); let (handler_send, handler_recv) = mpsc::unbounded_channel();
// Initialise sync and begin processing in thread // Initialise sync and begin processing in thread
let sync = SimpleSync::new(beacon_chain.clone(), &log); let sync = SimpleSync::new(beacon_chain.clone(), network_send, &log);
// generate the Message handler // generate the Message handler
let mut handler = MessageHandler { let mut handler = MessageHandler {
_chain: beacon_chain.clone(), _chain: beacon_chain.clone(),
sync, sync,
network_context: NetworkContext::new(network_send, log.clone()),
log: log.clone(), log: log.clone(),
}; };
@ -81,7 +77,7 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
match message { match message {
// we have initiated a connection to a peer // we have initiated a connection to a peer
HandlerMessage::PeerDialed(peer_id) => { HandlerMessage::PeerDialed(peer_id) => {
self.sync.on_connect(peer_id, &mut self.network_context); self.sync.on_connect(peer_id);
} }
// A peer has disconnected // A peer has disconnected
HandlerMessage::PeerDisconnected(peer_id) => { HandlerMessage::PeerDisconnected(peer_id) => {
@ -112,32 +108,24 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
/// A new RPC request has been received from the network. /// A new RPC request has been received from the network.
fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) { fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) {
match request { match request {
RPCRequest::Hello(hello_message) => self.sync.on_hello_request( RPCRequest::Hello(hello_message) => {
peer_id, self.sync
request_id, .on_hello_request(peer_id, request_id, hello_message)
hello_message, }
&mut self.network_context,
),
RPCRequest::Goodbye(goodbye_reason) => { RPCRequest::Goodbye(goodbye_reason) => {
debug!( debug!(
self.log, "PeerGoodbye"; self.log, "PeerGoodbye";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"reason" => format!("{:?}", reason), "reason" => format!("{:?}", goodbye_reason),
); );
self.sync.on_disconnect(peer_id), self.sync.on_disconnect(peer_id);
}, }
RPCRequest::BeaconBlocks(request) => self.sync.on_beacon_blocks_request( RPCRequest::BeaconBlocks(request) => self
peer_id, .sync
request_id, .on_beacon_blocks_request(peer_id, request_id, request),
request, RPCRequest::RecentBeaconBlocks(request) => self
&mut self.network_context, .sync
), .on_recent_beacon_blocks_request(peer_id, request_id, request),
RPCRequest::RecentBeaconBlocks(request) => self.sync.on_recent_beacon_blocks_request(
peer_id,
request_id,
request,
&mut self.network_context,
),
} }
} }
@ -163,20 +151,15 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
RPCErrorResponse::Success(response) => { RPCErrorResponse::Success(response) => {
match response { match response {
RPCResponse::Hello(hello_message) => { RPCResponse::Hello(hello_message) => {
self.sync.on_hello_response( self.sync.on_hello_response(peer_id, hello_message);
peer_id,
hello_message,
&mut self.network_context,
);
} }
RPCResponse::BeaconBlocks(response) => { RPCResponse::BeaconBlocks(response) => {
match self.decode_beacon_blocks(response) { match self.decode_beacon_blocks(&response) {
Ok(beacon_blocks) => { Ok(beacon_blocks) => {
self.sync.on_beacon_blocks_response( self.sync.on_beacon_blocks_response(
peer_id, peer_id,
request_id, request_id,
beacon_blocks, beacon_blocks,
&mut self.network_context,
); );
} }
Err(e) => { Err(e) => {
@ -186,13 +169,12 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
} }
} }
RPCResponse::RecentBeaconBlocks(response) => { RPCResponse::RecentBeaconBlocks(response) => {
match self.decode_beacon_blocks(response) { match self.decode_beacon_blocks(&response) {
Ok(beacon_blocks) => { Ok(beacon_blocks) => {
self.sync.on_recent_beacon_blocks_response( self.sync.on_recent_beacon_blocks_response(
request_id,
peer_id, peer_id,
request_id,
beacon_blocks, beacon_blocks,
&mut self.network_context,
); );
} }
Err(e) => { Err(e) => {
@ -217,19 +199,14 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
match gossip_message { match gossip_message {
PubsubMessage::Block(message) => match self.decode_gossip_block(message) { PubsubMessage::Block(message) => match self.decode_gossip_block(message) {
Ok(block) => { Ok(block) => {
let _should_forward_on = let _should_forward_on = self.sync.on_block_gossip(peer_id, block);
self.sync
.on_block_gossip(peer_id, block, &mut self.network_context);
} }
Err(e) => { Err(e) => {
debug!(self.log, "Invalid gossiped beacon block"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); debug!(self.log, "Invalid gossiped beacon block"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e));
} }
}, },
PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) { PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) {
Ok(attestation) => { Ok(attestation) => self.sync.on_attestation_gossip(peer_id, attestation),
self.sync
.on_attestation_gossip(peer_id, attestation, &mut self.network_context)
}
Err(e) => { Err(e) => {
debug!(self.log, "Invalid gossiped attestation"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); debug!(self.log, "Invalid gossiped attestation"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e));
} }
@ -331,56 +308,3 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
Vec::from_ssz_bytes(&beacon_blocks) Vec::from_ssz_bytes(&beacon_blocks)
} }
} }
/// Wraps a Network Channel to employ various RPC/Sync related network functionality.
pub struct NetworkContext {
/// The network channel to relay messages to the Network service.
network_send: mpsc::UnboundedSender<NetworkMessage>,
/// Logger for the `NetworkContext`.
log: slog::Logger,
}
impl NetworkContext {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>, log: slog::Logger) -> Self {
Self { network_send, log }
}
pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
self.send_rpc_request(peer_id, RPCRequest::Goodbye(reason))
// TODO: disconnect peers.
}
pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) {
// Note: There is currently no use of keeping track of requests. However the functionality
// is left here for future revisions.
self.send_rpc_event(peer_id, RPCEvent::Request(0, rpc_request));
}
//TODO: Handle Error responses
pub fn send_rpc_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
rpc_response: RPCErrorResponse,
) {
self.send_rpc_event(
peer_id,
RPCEvent::Response(request_id, RPCErrorResponse::Success(rpc_response)),
);
}
fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.send(peer_id, OutgoingMessage::RPC(rpc_event))
}
fn send(&mut self, peer_id: PeerId, outgoing_message: OutgoingMessage) {
self.network_send
.try_send(NetworkMessage::Send(peer_id, outgoing_message))
.unwrap_or_else(|_| {
warn!(
self.log,
"Could not send RPC message to the network service"
)
});
}
}

View File

@ -1,129 +1,164 @@
const MAX_BLOCKS_PER_REQUEST: usize = 10; use super::simple_sync::{PeerSyncInfo, FUTURE_SLOT_TOLERANCE};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::RequestId;
use eth2_libp2p::PeerId;
use slog::{debug, info, trace, warn, Logger};
use std::collections::{HashMap, HashSet};
use std::ops::{Add, Sub};
use std::sync::Arc;
use types::{BeaconBlock, EthSpec, Hash256, Slot};
const MAX_BLOCKS_PER_REQUEST: u64 = 10;
/// The number of slots that we can import blocks ahead of us, before going into full Sync mode. /// The number of slots that we can import blocks ahead of us, before going into full Sync mode.
const SLOT_IMPORT_TOLERANCE: u64 = 10; const SLOT_IMPORT_TOLERANCE: usize = 10;
const PARENT_FAIL_TOLERANCE: usize = 3; const PARENT_FAIL_TOLERANCE: usize = 3;
const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE*2; const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;
#[derive(PartialEq)]
enum BlockRequestsState { enum BlockRequestsState {
QueuedForward, QueuedForward,
QueuedBackward, QueuedBackward,
Pending(RequestId), Pending(RequestId),
Complete, Complete,
Failed,
} }
struct BlockRequests { struct BlockRequests<T: EthSpec> {
target_head_slot: Slot target_head_slot: Slot,
target_head_root: Hash256, target_head_root: Hash256,
downloaded_blocks: Vec<BeaconBlock>, downloaded_blocks: Vec<BeaconBlock<T>>,
state: State, state: BlockRequestsState,
} }
struct ParentRequests { struct ParentRequests<T: EthSpec> {
downloaded_blocks: Vec<BeaconBlock>, downloaded_blocks: Vec<BeaconBlock<T>>,
attempts: usize, failed_attempts: usize,
last_submitted_peer: PeerId, // to downvote the submitting peer. last_submitted_peer: PeerId, // to downvote the submitting peer.
state: BlockRequestsState, state: BlockRequestsState,
} }
impl BlockRequests { impl<T: EthSpec> BlockRequests<T> {
// gets the start slot for next batch // gets the start slot for next batch
// last block slot downloaded plus 1 // last block slot downloaded plus 1
fn next_start_slot(&self) -> Option<Slot> { fn next_start_slot(&self) -> Option<Slot> {
if !self.downloaded_blocks.is_empty() { if !self.downloaded_blocks.is_empty() {
match self.state { match self.state {
BlockRequestsState::QueuedForward => { BlockRequestsState::QueuedForward => {
let last_element_index = self.downloaded_blocks.len() -1; let last_element_index = self.downloaded_blocks.len() - 1;
Some(downloaded_blocks[last_element_index].slot.add(1)) Some(self.downloaded_blocks[last_element_index].slot.add(1))
} }
BlockRequestsState::QueuedBackward => { BlockRequestsState::QueuedBackward => {
let earliest_known_slot = self.downloaded_blocks[0].slot; let earliest_known_slot = self.downloaded_blocks[0].slot;
Some(earliest_known_slot.add(1).sub(MAX_BLOCKS_PER_REQUEST)) Some(earliest_known_slot.add(1).sub(MAX_BLOCKS_PER_REQUEST))
} }
_ => {
// pending/complete/failed
None
}
} }
} } else {
else {
None None
} }
} }
} }
#[derive(PartialEq, Debug, Clone)]
enum ManagerState { enum ManagerState {
Syncing, Syncing,
Regular, Regular,
Stalled, Stalled,
} }
enum ImportManagerOutcome { pub(crate) enum ImportManagerOutcome {
Idle, Idle,
RequestBlocks{ RequestBlocks {
peer_id: PeerId, peer_id: PeerId,
request_id: RequestId, request_id: RequestId,
request: BeaconBlocksRequest, request: BeaconBlocksRequest,
}, },
/// Updates information with peer via requesting another HELLO handshake.
Hello(PeerId),
RecentRequest(PeerId, RecentBeaconBlocksRequest), RecentRequest(PeerId, RecentBeaconBlocksRequest),
DownvotePeer(PeerId), DownvotePeer(PeerId),
} }
pub struct ImportManager<T: BeaconChainTypes> {
pub struct ImportManager {
/// A reference to the underlying beacon chain. /// A reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
state: MangerState, state: ManagerState,
import_queue: HashMap<PeerId, BlockRequests>, import_queue: HashMap<PeerId, BlockRequests<T::EthSpec>>,
parent_queue: Vec<ParentRequests>, parent_queue: Vec<ParentRequests<T::EthSpec>>,
full_peers: Hashset<PeerId>, full_peers: HashSet<PeerId>,
current_req_id: usize, current_req_id: usize,
log: Logger, log: Logger,
} }
impl ImportManager { impl<T: BeaconChainTypes> ImportManager<T> {
pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: &slog::Logger) -> Self {
ImportManager {
chain: beacon_chain.clone(),
state: ManagerState::Regular,
import_queue: HashMap::new(),
parent_queue: Vec::new(),
full_peers: HashSet::new(),
current_req_id: 0,
log: log.clone(),
}
}
pub fn add_peer(&mut self, peer_id, remote: PeerSyncInfo) { pub fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) {
// TODO: Improve comments. // TODO: Improve comments.
// initially try to download blocks from our current head // initially try to download blocks from our current head
// then backwards search all the way back to our finalized epoch until we match on a chain // then backwards search all the way back to our finalized epoch until we match on a chain
// has to be done sequentially to find next slot to start the batch from // has to be done sequentially to find next slot to start the batch from
let local = PeerSyncInfo::from(&self.chain); let local = PeerSyncInfo::from(&self.chain);
// If a peer is within SLOT_IMPORT_TOLERANCE from out head slot, ignore a batch sync // If a peer is within SLOT_IMPORT_TOLERANCE from out head slot, ignore a batch sync
if remote.head_slot.sub(local.head_slot) < SLOT_IMPORT_TOLERANCE { if remote.head_slot.sub(local.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE {
trace!(self.log, "Ignoring full sync with peer"; trace!(self.log, "Ignoring full sync with peer";
"peer" => peer_id, "peer" => format!("{:?}", peer_id),
"peer_head_slot" => remote.head_slot, "peer_head_slot" => remote.head_slot,
"local_head_slot" => local.head_slot, "local_head_slot" => local.head_slot,
); );
// remove the peer from the queue if it exists // remove the peer from the queue if it exists
self.import_queue.remove(&peer_id); self.import_queue.remove(&peer_id);
return; return;
} }
if let Some(block_requests) = self.import_queue.get_mut(&peer_id) { if let Some(block_requests) = self.import_queue.get_mut(&peer_id) {
// update the target head slot // update the target head slot
if remote.head_slot > requested_block.target_head_slot { if remote.head_slot > block_requests.target_head_slot {
block_requests.target_head_slot = remote.head_slot; block_requests.target_head_slot = remote.head_slot;
} }
} else { } else {
let block_requests = BlockRequests { let block_requests = BlockRequests {
target_head_slot: remote.head_slot, // this should be larger than the current head. It is checked in the SyncManager before add_peer is called target_head_slot: remote.head_slot, // this should be larger than the current head. It is checked in the SyncManager before add_peer is called
target_head_root: remote.head_root, target_head_root: remote.head_root,
downloaded_blocks: Vec::new(), downloaded_blocks: Vec::new(),
state: RequestedBlockState::Queued state: BlockRequestsState::QueuedForward,
} };
self.import_queue.insert(peer_id, block_requests); self.import_queue.insert(peer_id, block_requests);
} }
} }
pub fn beacon_blocks_response(peer_id: PeerId, request_id: RequestId, blocks: Vec<BeaconBlock>) { pub fn beacon_blocks_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
mut blocks: Vec<BeaconBlock<T::EthSpec>>,
) {
// find the request // find the request
let block_requests = match self.import_queue.get_mut(&peer_id) { let block_requests = match self
Some(req) if req.state = RequestedBlockState::Pending(request_id) => req, .import_queue
None => { .get_mut(&peer_id)
.filter(|r| r.state == BlockRequestsState::Pending(request_id))
{
Some(req) => req,
_ => {
// No pending request, invalid request_id or coding error // No pending request, invalid request_id or coding error
warn!(self.log, "BeaconBlocks response unknown"; "request_id" => request_id); warn!(self.log, "BeaconBlocks response unknown"; "request_id" => request_id);
return; return;
@ -142,100 +177,115 @@ impl ImportManager {
if blocks.is_empty() { if blocks.is_empty() {
warn!(self.log, "BeaconBlocks response was empty"; "request_id" => request_id); warn!(self.log, "BeaconBlocks response was empty"; "request_id" => request_id);
block_requests.state = RequestedBlockState::Failed; block_requests.state = BlockRequestsState::Failed;
return; return;
} }
// Add the newly downloaded blocks to the current list of downloaded blocks. This also // Add the newly downloaded blocks to the current list of downloaded blocks. This also
// determines if we are syncing forward or backward. // determines if we are syncing forward or backward.
let syncing_forwards = { let syncing_forwards = {
if block_requests.blocks.is_empty() { if block_requests.downloaded_blocks.is_empty() {
block_requests.blocks.push(blocks); block_requests.downloaded_blocks.append(&mut blocks);
true true
} } else if block_requests.downloaded_blocks[0].slot < blocks[0].slot {
else if block_requests.blocks[0].slot < blocks[0].slot { // syncing forwards // syncing forwards
// verify the peer hasn't sent overlapping blocks - ensuring the strictly // verify the peer hasn't sent overlapping blocks - ensuring the strictly
// increasing blocks in a batch will be verified during the processing // increasing blocks in a batch will be verified during the processing
if block_requests.next_slot() > blocks[0].slot { if block_requests.next_start_slot() > Some(blocks[0].slot) {
warn!(self.log, "BeaconBlocks response returned duplicate blocks", "request_id" => request_id, "response_initial_slot" => blocks[0].slot, "requested_initial_slot" => block_requests.next_slot()); warn!(self.log, "BeaconBlocks response returned duplicate blocks"; "request_id" => request_id, "response_initial_slot" => blocks[0].slot, "requested_initial_slot" => block_requests.next_start_slot());
block_requests.state = RequestedBlockState::Failed; block_requests.state = BlockRequestsState::Failed;
return; return;
}
block_requests.blocks.push(blocks);
true
} }
else { false }
block_requests.downloaded_blocks.append(&mut blocks);
true
} else {
false
}
}; };
// Determine if more blocks need to be downloaded. There are a few cases: // Determine if more blocks need to be downloaded. There are a few cases:
// - We have downloaded a batch from our head_slot, which has not reached the remotes head // - We have downloaded a batch from our head_slot, which has not reached the remotes head
// (target head). Therefore we need to download another sequential batch. // (target head). Therefore we need to download another sequential batch.
// - The latest batch includes blocks that greater than or equal to the target_head slot, // - The latest batch includes blocks that greater than or equal to the target_head slot,
// which means we have caught up to their head. We then check to see if the first // which means we have caught up to their head. We then check to see if the first
// block downloaded matches our head. If so, we are on the same chain and can process // block downloaded matches our head. If so, we are on the same chain and can process
// the blocks. If not we need to sync back further until we are on the same chain. So // the blocks. If not we need to sync back further until we are on the same chain. So
// request more blocks. // request more blocks.
// - We are syncing backwards (from our head slot) and need to check if we are on the same // - We are syncing backwards (from our head slot) and need to check if we are on the same
// chain. If so, process the blocks, if not, request more blocks all the way up to // chain. If so, process the blocks, if not, request more blocks all the way up to
// our last finalized slot. // our last finalized slot.
if syncing_forwards { if syncing_forwards {
// does the batch contain the target_head_slot // does the batch contain the target_head_slot
let last_element_index = block_requests.blocks.len()-1; let last_element_index = block_requests.downloaded_blocks.len() - 1;
if block_requests[last_element_index].slot >= block_requests.target_slot { if block_requests.downloaded_blocks[last_element_index].slot
>= block_requests.target_head_slot
{
// if the batch is on our chain, this is complete and we can then process. // if the batch is on our chain, this is complete and we can then process.
// Otherwise start backwards syncing until we reach a common chain. // Otherwise start backwards syncing until we reach a common chain.
let earliest_slot = block_requests_blocks[0].slot let earliest_slot = block_requests.downloaded_blocks[0].slot;
if block_requests.blocks[0] == self.chain.get_block_by_slot(earliest_slot) { //TODO: Decide which is faster. Reading block from db and comparing or calculating
block_requests.state = RequestedBlockState::Complete; //the hash tree root and comparing.
if Some(block_requests.downloaded_blocks[0].canonical_root())
== root_at_slot(self.chain, earliest_slot)
{
block_requests.state = BlockRequestsState::Complete;
return; return;
} }
// not on the same chain, request blocks backwards // not on the same chain, request blocks backwards
// binary search, request half the distance between the earliest block and our let state = &self.chain.head().beacon_state;
// finalized slot let local_finalized_slot = state
let state = &beacon_chain.head().beacon_state; .finalized_checkpoint
let local_finalized_slot = state.finalized_checkpoint.epoch; //TODO: Convert to slot .epoch
// check that the request hasn't failed by having no common chain .start_slot(T::EthSpec::slots_per_epoch());
if local_finalized_slot >= block_requests.blocks[0] {
// check that the request hasn't failed by having no common chain
if local_finalized_slot >= block_requests.downloaded_blocks[0].slot {
warn!(self.log, "Peer returned an unknown chain."; "request_id" => request_id); warn!(self.log, "Peer returned an unknown chain."; "request_id" => request_id);
block_requests.state = RequestedBlockState::Failed; block_requests.state = BlockRequestsState::Failed;
return; return;
} }
// Start a backwards sync by requesting earlier blocks // Start a backwards sync by requesting earlier blocks
// There can be duplication in downloaded blocks here if there are a large number // There can be duplication in downloaded blocks here if there are a large number
// of skip slots. In all cases we at least re-download the earliest known block. // of skip slots. In all cases we at least re-download the earliest known block.
// It is unlikely that a backwards sync in required, so we accept this duplication // It is unlikely that a backwards sync in required, so we accept this duplication
// for now. // for now.
block_requests.state = RequestedBlockState::QueuedBackward; block_requests.state = BlockRequestsState::QueuedBackward;
} else {
// batch doesn't contain the head slot, request the next batch
block_requests.state = BlockRequestsState::QueuedForward;
} }
else { } else {
// batch doesn't contain the head slot, request the next batch
block_requests.state = RequestedBlockState::QueuedForward;
}
}
else {
// syncing backwards // syncing backwards
// if the batch is on our chain, this is complete and we can then process. // if the batch is on our chain, this is complete and we can then process.
// Otherwise continue backwards // Otherwise continue backwards
let earliest_slot = block_requests_blocks[0].slot let earliest_slot = block_requests.downloaded_blocks[0].slot;
if block_requests.blocks[0] == self.chain.get_block_by_slot(earliest_slot) { if Some(block_requests.downloaded_blocks[0].canonical_root())
block_requests.state = RequestedBlockState::Complete; == root_at_slot(self.chain, earliest_slot)
{
block_requests.state = BlockRequestsState::Complete;
return; return;
} }
block_requests.state = RequestedBlockState::QueuedBackward; block_requests.state = BlockRequestsState::QueuedBackward;
} }
} }
pub fn recent_blocks_response(peer_id: PeerId, request_id: RequestId, blocks: Vec<BeaconBlock>) { pub fn recent_blocks_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
blocks: Vec<BeaconBlock<T::EthSpec>>,
) {
// find the request // find the request
let parent_request = match self.parent_queue.get_mut(&peer_id) { let parent_request = match self
Some(req) if req.state = RequestedBlockState::Pending(request_id) => req, .parent_queue
.iter_mut()
.find(|request| request.state == BlockRequestsState::Pending(request_id))
{
Some(req) => req,
None => { None => {
// No pending request, invalid request_id or coding error // No pending request, invalid request_id or coding error
warn!(self.log, "RecentBeaconBlocks response unknown"; "request_id" => request_id); warn!(self.log, "RecentBeaconBlocks response unknown"; "request_id" => request_id);
@ -245,8 +295,8 @@ impl ImportManager {
// if an empty response is given, the peer didn't have the requested block, try again // if an empty response is given, the peer didn't have the requested block, try again
if blocks.is_empty() { if blocks.is_empty() {
parent_request.attempts += 1; parent_request.failed_attempts += 1;
parent_request.state = RequestedBlockState::QueuedForward; parent_request.state = BlockRequestsState::QueuedForward;
parent_request.last_submitted_peer = peer_id; parent_request.last_submitted_peer = peer_id;
return; return;
} }
@ -256,29 +306,27 @@ impl ImportManager {
if blocks.len() != 1 { if blocks.len() != 1 {
//TODO: Potentially downvote the peer //TODO: Potentially downvote the peer
debug!(self.log, "Peer sent more than 1 parent. Ignoring"; debug!(self.log, "Peer sent more than 1 parent. Ignoring";
"peer_id" => peer_id, "peer_id" => format!("{:?}", peer_id),
"no_parents" => blocks.len() "no_parents" => blocks.len()
); );
return; return;
} }
// queue for processing // queue for processing
parent_request.state = RequestedBlockState::Complete; parent_request.state = BlockRequestsState::Complete;
} }
pub fn inject_error(peer_id: PeerId, id: RequestId) { pub fn inject_error(peer_id: PeerId, id: RequestId) {
//TODO: Remove block state from pending //TODO: Remove block state from pending
} }
pub fn peer_disconnect(peer_id: PeerId) { pub fn peer_disconnect(&mut self, peer_id: &PeerId) {
self.import_queue.remove(&peer_id); self.import_queue.remove(peer_id);
self.full_peers.remove(&peer_id); self.full_peers.remove(peer_id);
self.update_state(); self.update_state();
} }
pub fn add_full_peer(peer_id: PeerId) { pub fn add_full_peer(&mut self, peer_id: PeerId) {
debug!( debug!(
self.log, "Fully synced peer added"; self.log, "Fully synced peer added";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
@ -287,32 +335,36 @@ impl ImportManager {
self.update_state(); self.update_state();
} }
pub fn add_unknown_block(&mut self,block: BeaconBlock) { pub fn add_unknown_block(&mut self, block: BeaconBlock<T::EthSpec>, peer_id: PeerId) {
// if we are not in regular sync mode, ignore this block // if we are not in regular sync mode, ignore this block
if self.state == ManagerState::Regular { if let ManagerState::Regular = self.state {
return; return;
} }
// make sure this block is not already being searched for // make sure this block is not already being searched for
// TODO: Potentially store a hashset of blocks for O(1) lookups // TODO: Potentially store a hashset of blocks for O(1) lookups
for parent_req in self.parent_queue.iter() { for parent_req in self.parent_queue.iter() {
if let Some(_) = parent_req.downloaded_blocks.iter().find(|d_block| d_block == block) { if let Some(_) = parent_req
.downloaded_blocks
.iter()
.find(|d_block| d_block == &&block)
{
// we are already searching for this block, ignore it // we are already searching for this block, ignore it
return; return;
} }
} }
let req = ParentRequests { let req = ParentRequests {
downloaded_blocks: vec![block], downloaded_blocks: vec![block],
failed_attempts: 0, failed_attempts: 0,
state: RequestedBlockState::QueuedBackward last_submitted_peer: peer_id,
} state: BlockRequestsState::QueuedBackward,
};
self.parent_queue.push(req); self.parent_queue.push(req);
} }
pub fn poll() -> ImportManagerOutcome { pub fn poll(&mut self) -> ImportManagerOutcome {
loop { loop {
// update the state of the manager // update the state of the manager
self.update_state(); self.update_state();
@ -336,304 +388,340 @@ impl ImportManager {
if let (re_run, outcome) = self.process_complete_parent_requests() { if let (re_run, outcome) = self.process_complete_parent_requests() {
if let Some(outcome) = outcome { if let Some(outcome) = outcome {
return outcome; return outcome;
} } else if !re_run {
else if !re_run {
break; break;
} }
} }
} }
return ImportManagerOutcome::Idle;
return ImportManagerOutcome::Idle;
} }
fn update_state(&mut self) { fn update_state(&mut self) {
let previous_state = self.state; let previous_state = self.state.clone();
self.state = { self.state = {
if !self.import_queue.is_empty() { if !self.import_queue.is_empty() {
ManagerState::Syncing ManagerState::Syncing
} else if !self.full_peers.is_empty() {
ManagerState::Regular
} else {
ManagerState::Stalled
} }
else if !self.full_peers.is_empty() {
ManagerState::Regualar
}
else {
ManagerState::Stalled }
}; };
if self.state != previous_state { if self.state != previous_state {
info!(self.log, "Syncing state updated", info!(self.log, "Syncing state updated";
"old_state" => format!("{:?}", previous_state) "old_state" => format!("{:?}", previous_state),
"new_state" => format!("{:?}", self.state) "new_state" => format!("{:?}", self.state),
); );
} }
} }
fn process_potential_block_requests(&mut self) -> Option<ImportManagerOutcome> {
fn process_potential_block_requests(&mut self) -> Option<ImportManagerOutcome> {
// check if an outbound request is required // check if an outbound request is required
// Managing a fixed number of outbound requests is maintained at the RPC protocol libp2p // Managing a fixed number of outbound requests is maintained at the RPC protocol libp2p
// layer and not needed here. // layer and not needed here.
// If any in queued state we submit a request. // If any in queued state we submit a request.
// remove any failed batches // remove any failed batches
self.import_queue.retain(|peer_id, block_request| { self.import_queue.retain(|peer_id, block_request| {
if block_request.state == RequestedBlockState::Failed { if let BlockRequestsState::Failed = block_request.state {
debug!(self.log, "Block import from peer failed", debug!(self.log, "Block import from peer failed";
"peer_id" => peer_id, "peer_id" => format!("{:?}", peer_id),
"downloaded_blocks" => block_request.downloaded.blocks.len() "downloaded_blocks" => block_request.downloaded_blocks.len()
); );
false false
} else {
true
} }
else { true }
}); });
// process queued block requests
for (peer_id, block_requests) in self.import_queue.iter_mut().find(|(_peer_id, req)| {
req.state == BlockRequestsState::QueuedForward
|| req.state == BlockRequestsState::QueuedBackward
}) {
let request_id = self.current_req_id;
block_requests.state = BlockRequestsState::Pending(request_id);
self.current_req_id += 1;
for (peer_id, block_requests) in self.import_queue.iter_mut() { let request = BeaconBlocksRequest {
if let Some(request) = requests.iter().find(|req| req.state == RequestedBlockState::QueuedForward || req.state == RequestedBlockState::QueuedBackward) { head_block_root: block_requests.target_head_root,
start_slot: block_requests
let request.state = RequestedBlockState::Pending(self.current_req_id); .next_start_slot()
self.current_req_id +=1; .unwrap_or_else(|| self.chain.best_slot())
.as_u64(),
let req = BeaconBlocksRequest { count: MAX_BLOCKS_PER_REQUEST,
head_block_root: request.target_root, step: 0,
start_slot: request.next_start_slot().unwrap_or_else(|| self.chain.head().slot), };
count: MAX_BLOCKS_PER_REQUEST, return Some(ImportManagerOutcome::RequestBlocks {
step: 0 peer_id: peer_id.clone(),
} request,
return Some(ImportManagerOutCome::RequestBlocks{ peer_id, req }); request_id,
} });
} }
None None
} }
fn process_complete_batches(&mut self) -> Option<ImportManagerOutcome> { fn process_complete_batches(&mut self) -> Option<ImportManagerOutcome> {
let completed_batches = self
let completed_batches = self.import_queue.iter().filter(|_peer, block_requests| block_requests.state == RequestedState::Complete).map(|peer, _| peer).collect::<Vec<PeerId>>(); .import_queue
.iter()
.filter(|(_peer, block_requests)| block_requests.state == BlockRequestsState::Complete)
.map(|(peer, _)| peer)
.cloned()
.collect::<Vec<PeerId>>();
for peer_id in completed_batches { for peer_id in completed_batches {
let block_requests = self.import_queue.remove(&peer_id).unwrap("key exists"); let block_requests = self.import_queue.remove(&peer_id).expect("key exists");
match self.process_blocks(block_requests.downloaded_blocks) { match self.process_blocks(block_requests.downloaded_blocks.clone()) {
Ok(()) => { Ok(()) => {
//TODO: Verify it's impossible to have empty downloaded_blocks //TODO: Verify it's impossible to have empty downloaded_blocks
last_element = block_requests.downloaded_blocks.len() -1 let last_element = block_requests.downloaded_blocks.len() - 1;
debug!(self.log, "Blocks processed successfully"; debug!(self.log, "Blocks processed successfully";
"peer" => peer_id, "peer" => format!("{:?}", peer_id),
"start_slot" => block_requests.downloaded_blocks[0].slot, "start_slot" => block_requests.downloaded_blocks[0].slot,
"end_slot" => block_requests.downloaded_blocks[last_element].slot, "end_slot" => block_requests.downloaded_blocks[last_element].slot,
"no_blocks" => last_element + 1, "no_blocks" => last_element + 1,
); );
// Re-HELLO to ensure we are up to the latest head // Re-HELLO to ensure we are up to the latest head
return Some(ImportManagerOutcome::Hello(peer_id)); return Some(ImportManagerOutcome::Hello(peer_id));
}
Err(e) => {
last_element = block_requests.downloaded_blocks.len() -1
warn!(self.log, "Block processing failed";
"peer" => peer_id,
"start_slot" => block_requests.downloaded_blocks[0].slot,
"end_slot" => block_requests.downloaded_blocks[last_element].slot,
"no_blocks" => last_element + 1,
"error" => format!("{:?}", e),
);
return Some(ImportManagerOutcome::DownvotePeer(peer_id));
}
} }
Err(e) => {
let last_element = block_requests.downloaded_blocks.len() - 1;
warn!(self.log, "Block processing failed";
"peer" => format!("{:?}", peer_id),
"start_slot" => block_requests.downloaded_blocks[0].slot,
"end_slot" => block_requests.downloaded_blocks[last_element].slot,
"no_blocks" => last_element + 1,
"error" => format!("{:?}", e),
);
return Some(ImportManagerOutcome::DownvotePeer(peer_id));
}
}
} }
None None
} }
fn process_parent_requests(&mut self) -> Option<ImportManagerOutcome> { fn process_parent_requests(&mut self) -> Option<ImportManagerOutcome> {
// remove any failed requests // remove any failed requests
self.parent_queue.retain(|parent_request| { self.parent_queue.retain(|parent_request| {
if parent_request.state == RequestedBlockState::Failed { if parent_request.state == BlockRequestsState::Failed {
debug!(self.log, "Parent import failed", debug!(self.log, "Parent import failed";
"block" => parent_request.downloaded_blocks[0].hash, "block" => format!("{:?}",parent_request.downloaded_blocks[0].canonical_root()),
"siblings found" => parent_request.len() "ancestors_found" => parent_request.downloaded_blocks.len()
); );
false false
} else {
true
} }
else { true }
}); });
// check to make sure there are peers to search for the parent from // check to make sure there are peers to search for the parent from
if self.full_peers.is_empty() { if self.full_peers.is_empty() {
return; return None;
} }
// check if parents need to be searched for // check if parents need to be searched for
for parent_request in self.parent_queue.iter_mut() { for parent_request in self.parent_queue.iter_mut() {
if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE { if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE {
parent_request.state == BlockRequestsState::Failed parent_request.state == BlockRequestsState::Failed;
continue; continue;
} } else if parent_request.state == BlockRequestsState::QueuedForward {
else if parent_request.state == BlockRequestsState::QueuedForward {
parent_request.state = BlockRequestsState::Pending(self.current_req_id); parent_request.state = BlockRequestsState::Pending(self.current_req_id);
self.current_req_id +=1; self.current_req_id += 1;
let parent_hash = let last_element_index = parent_request.downloaded_blocks.len() - 1;
let parent_hash = parent_request.downloaded_blocks[last_element_index].parent_root;
let req = RecentBeaconBlocksRequest { let req = RecentBeaconBlocksRequest {
block_roots: vec![parent_hash], block_roots: vec![parent_hash],
}; };
// select a random fully synced peer to attempt to download the parent block // select a random fully synced peer to attempt to download the parent block
let peer_id = self.full_peers.iter().next().expect("List is not empty"); let peer_id = self.full_peers.iter().next().expect("List is not empty");
return Some(ImportManagerOutcome::RecentRequest(peer_id, req); return Some(ImportManagerOutcome::RecentRequest(peer_id.clone(), req));
} }
} }
None None
} }
fn process_complete_parent_requests(&mut self) => (bool, Option<ImportManagerOutcome>) {
fn process_complete_parent_requests(&mut self) -> (bool, Option<ImportManagerOutcome>) {
// flag to determine if there is more process to drive or if the manager can be switched to // flag to determine if there is more process to drive or if the manager can be switched to
// an idle state // an idle state
let mut re_run = false; let mut re_run = false;
// verify the last added block is the parent of the last requested block
let last_index = parent_requests.downloaded_blocks.len() -1;
let expected_hash = parent_requests.downloaded_blocks[last_index].parent ;
let block_hash = parent_requests.downloaded_blocks[0].tree_hash_root();
if block_hash != expected_hash {
//TODO: Potentially downvote the peer
debug!(self.log, "Peer sent invalid parent. Ignoring";
"peer_id" => peer_id,
"received_block" => block_hash,
"expected_parent" => expected_hash,
);
return;
}
// Find any parent_requests ready to be processed // Find any parent_requests ready to be processed
for completed_request in self.parent_queue.iter_mut().filter(|req| req.state == BlockRequestsState::Complete) { for completed_request in self
.parent_queue
.iter_mut()
.filter(|req| req.state == BlockRequestsState::Complete)
{
// verify the last added block is the parent of the last requested block
let last_index = completed_request.downloaded_blocks.len() - 1;
let expected_hash = completed_request.downloaded_blocks[last_index].parent_root;
// Note: the length must be greater than 1 so this cannot panic.
let block_hash = completed_request.downloaded_blocks[last_index - 1].canonical_root();
if block_hash != expected_hash {
// remove the head block
let _ = completed_request.downloaded_blocks.pop();
completed_request.state = BlockRequestsState::QueuedForward;
//TODO: Potentially downvote the peer
let peer = completed_request.last_submitted_peer.clone();
debug!(self.log, "Peer sent invalid parent. Ignoring";
"peer_id" => format!("{:?}",peer),
"received_block" => format!("{}", block_hash),
"expected_parent" => format!("{}", expected_hash),
);
return (true, Some(ImportManagerOutcome::DownvotePeer(peer)));
}
// try and process the list of blocks up to the requested block // try and process the list of blocks up to the requested block
while !completed_request.downloaded_blocks.is_empty() { while !completed_request.downloaded_blocks.is_empty() {
let block = completed_request.downloaded_blocks.pop(); let block = completed_request
match self.chain_process_block(block.clone()) { .downloaded_blocks
Ok(BlockProcessingOutcome::ParentUnknown { parent } => { .pop()
.expect("Block must exist exist");
match self.chain.process_block(block.clone()) {
Ok(BlockProcessingOutcome::ParentUnknown { parent: _ }) => {
// need to keep looking for parents // need to keep looking for parents
completed_request.downloaded_blocks.push(block); completed_request.downloaded_blocks.push(block);
completed_request.state == BlockRequestsState::QueuedForward; completed_request.state == BlockRequestsState::QueuedForward;
re_run = true; re_run = true;
break; break;
} }
Ok(BlockProcessingOutcome::Processed { _ } => { } Ok(BlockProcessingOutcome::Processed { block_root: _ }) => {}
Ok(outcome) => { // it's a future slot or an invalid block, remove it and try again Ok(outcome) => {
completed_request.failed_attempts +=1; // it's a future slot or an invalid block, remove it and try again
completed_request.failed_attempts += 1;
trace!( trace!(
self.log, "Invalid parent block"; self.log, "Invalid parent block";
"outcome" => format!("{:?}", outcome); "outcome" => format!("{:?}", outcome),
"peer" => format!("{:?}", completed_request.last_submitted_peer), "peer" => format!("{:?}", completed_request.last_submitted_peer),
); );
completed_request.state == BlockRequestsState::QueuedForward; completed_request.state == BlockRequestsState::QueuedForward;
re_run = true; re_run = true;
return (re_run, Some(ImportManagerOutcome::DownvotePeer(completed_request.last_submitted_peer))); return (
re_run,
Some(ImportManagerOutcome::DownvotePeer(
completed_request.last_submitted_peer.clone(),
)),
);
} }
Err(e) => { Err(e) => {
completed_request.failed_attempts +=1; completed_request.failed_attempts += 1;
warn!( warn!(
self.log, "Parent processing error"; self.log, "Parent processing error";
"error" => format!("{:?}", e); "error" => format!("{:?}", e)
); );
completed_request.state == BlockRequestsState::QueuedForward; completed_request.state == BlockRequestsState::QueuedForward;
re_run = true; re_run = true;
return (re_run, Some(ImportManagerOutcome::DownvotePeer(completed_request.last_submitted_peer))); return (
} re_run,
Some(ImportManagerOutcome::DownvotePeer(
completed_request.last_submitted_peer.clone(),
)),
);
} }
}
} }
} }
// remove any full completed and processed parent chains // remove any full completed and processed parent chains
self.parent_queue.retain(|req| if req.state == BlockRequestsState::Complete { false } else { true }); self.parent_queue.retain(|req| {
if req.state == BlockRequestsState::Complete {
false
} else {
true
}
});
(re_run, None) (re_run, None)
} }
fn process_blocks(&mut self, blocks: Vec<BeaconBlock<T::EthSpec>>) -> Result<(), String> {
fn process_blocks(
&mut self,
blocks: Vec<BeaconBlock<T::EthSpec>>,
) -> Result<(), String> {
for block in blocks { for block in blocks {
let processing_result = self.chain.process_block(block.clone()); let processing_result = self.chain.process_block(block.clone());
if let Ok(outcome) = processing_result { if let Ok(outcome) = processing_result {
match outcome { match outcome {
BlockProcessingOutcome::Processed { block_root } => { BlockProcessingOutcome::Processed { block_root } => {
// The block was valid and we processed it successfully. // The block was valid and we processed it successfully.
trace!(
self.log, "Imported block from network";
"source" => source,
"slot" => block.slot,
"block_root" => format!("{}", block_root),
"peer" => format!("{:?}", peer_id),
);
}
BlockProcessingOutcome::ParentUnknown { parent } => {
// blocks should be sequential and all parents should exist
trace!(
self.log, "ParentBlockUnknown";
"source" => source,
"parent_root" => format!("{}", parent),
"baby_block_slot" => block.slot,
);
return Err(format!("Block at slot {} has an unknown parent.", block.slot));
}
BlockProcessingOutcome::FutureSlot {
present_slot,
block_slot,
} => {
if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot {
// The block is too far in the future, drop it.
trace!( trace!(
self.log, "FutureBlock"; self.log, "Imported block from network";
"source" => source, "slot" => block.slot,
"msg" => "block for future slot rejected, check your time", "block_root" => format!("{}", block_root),
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
"peer" => format!("{:?}", peer_id),
);
return Err(format!("Block at slot {} is too far in the future", block.slot));
} else {
// The block is in the future, but not too far.
trace!(
self.log, "QueuedFutureBlock";
"source" => source,
"msg" => "queuing future block, check your time",
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
"peer" => format!("{:?}", peer_id),
); );
} }
BlockProcessingOutcome::ParentUnknown { parent } => {
// blocks should be sequential and all parents should exist
trace!(
self.log, "ParentBlockUnknown";
"parent_root" => format!("{}", parent),
"baby_block_slot" => block.slot,
);
return Err(format!(
"Block at slot {} has an unknown parent.",
block.slot
));
}
BlockProcessingOutcome::FutureSlot {
present_slot,
block_slot,
} => {
if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot {
// The block is too far in the future, drop it.
trace!(
self.log, "FutureBlock";
"msg" => "block for future slot rejected, check your time",
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
);
return Err(format!(
"Block at slot {} is too far in the future",
block.slot
));
} else {
// The block is in the future, but not too far.
trace!(
self.log, "QueuedFutureBlock";
"msg" => "queuing future block, check your time",
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
);
}
}
_ => {
trace!(
self.log, "InvalidBlock";
"msg" => "peer sent invalid block",
"outcome" => format!("{:?}", outcome),
);
return Err(format!("Invalid block at slot {}", block.slot));
}
} }
_ => { } else {
trace!( trace!(
self.log, "InvalidBlock"; self.log, "BlockProcessingFailure";
"source" => source, "msg" => "unexpected condition in processing block.",
"msg" => "peer sent invalid block", "outcome" => format!("{:?}", processing_result)
"outcome" => format!("{:?}", outcome), );
"peer" => format!("{:?}", peer_id), return Err(format!(
); "Unexpected block processing error: {:?}",
return Err(format!("Invalid block at slot {}", block.slot)); processing_result
} ));
} }
Ok(())
} else {
trace!(
self.log, "BlockProcessingFailure";
"source" => source,
"msg" => "unexpected condition in processing block.",
"outcome" => format!("{:?}", processing_result)
);
return Err(format!("Unexpected block processing error: {:?}", processing_result));
} }
} Ok(())
} }
} }
fn root_at_slot<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
target_slot: Slot,
) -> Option<Hash256> {
chain
.rev_iter_block_roots()
.find(|(_root, slot)| *slot == target_slot)
.map(|(root, _slot)| root)
}

View File

@ -1,4 +1,4 @@
mod import_queue; mod manager;
/// Syncing for lighthouse. /// Syncing for lighthouse.
/// ///
/// Stores the various syncing methods for the beacon chain. /// Stores the various syncing methods for the beacon chain.

View File

@ -1,8 +1,9 @@
use super::import_queue::{ImportQueue, PartialBeaconBlockCompletion}; use super::manager::{ImportManager, ImportManagerOutcome};
use crate::message_handler::NetworkContext; use crate::service::{NetworkMessage, OutgoingMessage};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::PeerId; use eth2_libp2p::PeerId;
use slog::{debug, error, info, o, trace, warn}; use slog::{debug, error, info, o, trace, warn};
use ssz::Encode; use ssz::Encode;
@ -10,14 +11,14 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use store::Store; use store::Store;
use tokio::sync::mpsc;
use types::{ use types::{
Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, EthSpec, Hash256, Slot, Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, EthSpec, Hash256, Slot,
}; };
/// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it.
/// Otherwise we queue it. /// Otherwise we queue it.
const FUTURE_SLOT_TOLERANCE: u64 = 1; pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1;
const SHOULD_FORWARD_GOSSIP_BLOCK: bool = true; const SHOULD_FORWARD_GOSSIP_BLOCK: bool = true;
const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false; const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false;
@ -25,16 +26,13 @@ const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false;
/// Keeps track of syncing information for known connected peers. /// Keeps track of syncing information for known connected peers.
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub struct PeerSyncInfo { pub struct PeerSyncInfo {
fork_version: [u8,4], fork_version: [u8; 4],
finalized_root: Hash256, pub finalized_root: Hash256,
finalized_epoch: Epoch, pub finalized_epoch: Epoch,
head_root: Hash256, pub head_root: Hash256,
head_slot: Slot, pub head_slot: Slot,
} }
impl From<HelloMessage> for PeerSyncInfo { impl From<HelloMessage> for PeerSyncInfo {
fn from(hello: HelloMessage) -> PeerSyncInfo { fn from(hello: HelloMessage) -> PeerSyncInfo {
PeerSyncInfo { PeerSyncInfo {
@ -43,7 +41,6 @@ impl From<HelloMessage> for PeerSyncInfo {
finalized_epoch: hello.finalized_epoch, finalized_epoch: hello.finalized_epoch,
head_root: hello.head_root, head_root: hello.head_root,
head_slot: hello.head_slot, head_slot: hello.head_slot,
requested_slot_skip: None,
} }
} }
} }
@ -66,18 +63,24 @@ pub enum SyncState {
pub struct SimpleSync<T: BeaconChainTypes> { pub struct SimpleSync<T: BeaconChainTypes> {
/// A reference to the underlying beacon chain. /// A reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
manager: ImportManager, manager: ImportManager<T>,
network: NetworkContext,
log: slog::Logger, log: slog::Logger,
} }
impl<T: BeaconChainTypes> SimpleSync<T> { impl<T: BeaconChainTypes> SimpleSync<T> {
/// Instantiate a `SimpleSync` instance, with no peers and an empty queue. /// Instantiate a `SimpleSync` instance, with no peers and an empty queue.
pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: &slog::Logger) -> Self { pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
log: &slog::Logger,
) -> Self {
let sync_logger = log.new(o!("Service"=> "Sync")); let sync_logger = log.new(o!("Service"=> "Sync"));
SimpleSync { SimpleSync {
chain: beacon_chain.clone(), chain: beacon_chain.clone(),
manager: ImportManager::new(), manager: ImportManager::new(beacon_chain, log),
network: NetworkContext::new(network_send, log.clone()),
log: sync_logger, log: sync_logger,
} }
} }
@ -92,8 +95,9 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
/// Handle the connection of a new peer. /// Handle the connection of a new peer.
/// ///
/// Sends a `Hello` message to the peer. /// Sends a `Hello` message to the peer.
pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { pub fn on_connect(&mut self, peer_id: PeerId) {
network.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain))); self.network
.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain)));
} }
/// Handle a `Hello` request. /// Handle a `Hello` request.
@ -104,42 +108,31 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
peer_id: PeerId, peer_id: PeerId,
request_id: RequestId, request_id: RequestId,
hello: HelloMessage, hello: HelloMessage,
network: &mut NetworkContext,
) { ) {
trace!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); trace!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id));
// Say hello back. // Say hello back.
network.send_rpc_response( self.network.send_rpc_response(
peer_id.clone(), peer_id.clone(),
request_id, request_id,
RPCResponse::Hello(hello_message(&self.chain)), RPCResponse::Hello(hello_message(&self.chain)),
); );
self.process_hello(peer_id, hello, network); self.process_hello(peer_id, hello);
} }
/// Process a `Hello` response from a peer. /// Process a `Hello` response from a peer.
pub fn on_hello_response( pub fn on_hello_response(&mut self, peer_id: PeerId, hello: HelloMessage) {
&mut self,
peer_id: PeerId,
hello: HelloMessage,
network: &mut NetworkContext,
) {
trace!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id)); trace!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id));
// Process the hello message, without sending back another hello. // Process the hello message, without sending back another hello.
self.process_hello(peer_id, hello, network); self.process_hello(peer_id, hello);
} }
/// Process a `Hello` message, requesting new blocks if appropriate. /// Process a `Hello` message, requesting new blocks if appropriate.
/// ///
/// Disconnects the peer if required. /// Disconnects the peer if required.
fn process_hello( fn process_hello(&mut self, peer_id: PeerId, hello: HelloMessage) {
&mut self,
peer_id: PeerId,
hello: HelloMessage,
network: &mut NetworkContext,
) {
let remote = PeerSyncInfo::from(hello); let remote = PeerSyncInfo::from(hello);
let local = PeerSyncInfo::from(&self.chain); let local = PeerSyncInfo::from(&self.chain);
@ -153,12 +146,13 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"reason" => "network_id" "reason" => "network_id"
); );
network.disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); self.network
.disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork);
} else if remote.finalized_epoch <= local.finalized_epoch } else if remote.finalized_epoch <= local.finalized_epoch
&& remote.finalized_root != Hash256::zero() && remote.finalized_root != Hash256::zero()
&& local.finalized_root != Hash256::zero() && local.finalized_root != Hash256::zero()
&& (self.root_at_slot(start_slot(remote.latest_finalized_epoch)) && (self.root_at_slot(start_slot(remote.finalized_epoch))
!= Some(remote.latest_finalized_root)) != Some(remote.finalized_root))
{ {
// The remotes finalized epoch is less than or greater than ours, but the block root is // The remotes finalized epoch is less than or greater than ours, but the block root is
// different to the one in our chain. // different to the one in our chain.
@ -169,8 +163,9 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"reason" => "different finalized chain" "reason" => "different finalized chain"
); );
network.disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); self.network
} else if remote.latest_finalized_epoch < local.latest_finalized_epoch { .disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork);
} else if remote.finalized_epoch < local.finalized_epoch {
// The node has a lower finalized epoch, their chain is not useful to us. There are two // The node has a lower finalized epoch, their chain is not useful to us. There are two
// cases where a node can have a lower finalized epoch: // cases where a node can have a lower finalized epoch:
// //
@ -193,12 +188,12 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
} else if self } else if self
.chain .chain
.store .store
.exists::<BeaconBlock<T::EthSpec>>(&remote.best_root) .exists::<BeaconBlock<T::EthSpec>>(&remote.head_root)
.unwrap_or_else(|_| false) .unwrap_or_else(|_| false)
{ {
// If the node's best-block is already known to us and they are close to our current // If the node's best-block is already known to us and they are close to our current
// head, treat them as a fully sync'd peer. // head, treat them as a fully sync'd peer.
self.import_manager.add_full_peer(peer_id); self.manager.add_full_peer(peer_id);
self.process_sync(); self.process_sync();
} else { } else {
// The remote node has an equal or great finalized epoch and we don't know it's head. // The remote node has an equal or great finalized epoch and we don't know it's head.
@ -208,29 +203,45 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
debug!( debug!(
self.log, "UsefulPeer"; self.log, "UsefulPeer";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"local_finalized_epoch" => local.latest_finalized_epoch, "local_finalized_epoch" => local.finalized_epoch,
"remote_latest_finalized_epoch" => remote.latest_finalized_epoch, "remote_latest_finalized_epoch" => remote.finalized_epoch,
); );
self.import_manager.add_peer(peer_id, remote); self.manager.add_peer(peer_id, remote);
self.process_sync(); self.process_sync();
} }
} }
self.proess_sync(&mut self) { fn process_sync(&mut self) {
loop { loop {
match self.import_manager.poll() { match self.manager.poll() {
ImportManagerOutcome::RequestBlocks(peer_id, req) { ImportManagerOutcome::Hello(peer_id) => {
trace!(
self.log,
"RPC Request";
"method" => "HELLO",
"peer" => format!("{:?}", peer_id)
);
self.network
.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain)));
}
ImportManagerOutcome::RequestBlocks {
peer_id,
request_id,
request,
} => {
trace!( trace!(
self.log, self.log,
"RPC Request"; "RPC Request";
"method" => "BeaconBlocks", "method" => "BeaconBlocks",
"count" => req.count, "id" => request_id,
"count" => request.count,
"peer" => format!("{:?}", peer_id) "peer" => format!("{:?}", peer_id)
); );
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlocks(req)); self.network
}, .send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlocks(request));
ImportManagerOutcome::RecentRequest(peer_id, req) { }
ImportManagerOutcome::RecentRequest(peer_id, req) => {
trace!( trace!(
self.log, self.log,
"RPC Request"; "RPC Request";
@ -238,18 +249,20 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"count" => req.block_roots.len(), "count" => req.block_roots.len(),
"peer" => format!("{:?}", peer_id) "peer" => format!("{:?}", peer_id)
); );
network.send_rpc_request(peer_id.clone(), RPCRequest::RecentBeaconBlocks(req)); self.network
}, .send_rpc_request(peer_id.clone(), RPCRequest::RecentBeaconBlocks(req));
ImportManagerOutcome::DownvotePeer(peer_id) { }
ImportManagerOutcome::DownvotePeer(peer_id) => {
trace!( trace!(
self.log, self.log,
"Peer downvoted"; "Peer downvoted";
"peer" => format!("{:?}", peer_id) "peer" => format!("{:?}", peer_id)
); );
// TODO: Implement reputation // TODO: Implement reputation
network.disconnect(peer_id.clone(), GoodbyeReason::Fault); self.network
}, .disconnect(peer_id.clone(), GoodbyeReason::Fault);
SyncManagerState::Idle { }
ImportManagerOutcome::Idle => {
// nothing to do // nothing to do
return; return;
} }
@ -257,37 +270,26 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
} }
} }
/*
fn root_at_slot(&self, target_slot: Slot) -> Option<Hash256> { fn root_at_slot(&self, target_slot: Slot) -> Option<Hash256> {
self.chain self.chain
.rev_iter_block_roots() .rev_iter_block_roots()
.find(|(_root, slot)| *slot == target_slot) .find(|(_root, slot)| *slot == target_slot)
.map(|(root, _slot)| root) .map(|(root, _slot)| root)
} }
*/
/// Handle a `BeaconBlocks` request from the peer. /// Handle a `RecentBeaconBlocks` request from the peer.
pub fn on_beacon_blocks_request( pub fn on_recent_beacon_blocks_request(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request_id: RequestId, request_id: RequestId,
req: BeaconBlocksRequest, request: RecentBeaconBlocksRequest,
network: &mut NetworkContext,
) { ) {
debug!( let blocks: Vec<BeaconBlock<_>> = request
self.log, .block_roots
"BeaconBlocksRequest"; .iter()
"peer" => format!("{:?}", peer_id), .filter_map(|root| {
"count" => req.count,
"start_slot" => req.start_slot,
);
let blocks = Vec<BeaconBlock<T::EthSpec>> = self
.chain.rev_iter_block_roots().filter(|(_root, slot) req.start_slot <= slot && req.start_slot + req.count >= slot).take_while(|(_root, slot) req.start_slot <= *slot)
.filter_map(|root, slot| {
if let Ok(Some(block)) = self.chain.store.get::<BeaconBlock<T::EthSpec>>(root) { if let Ok(Some(block)) = self.chain.store.get::<BeaconBlock<T::EthSpec>>(root) {
Some(block.body) Some(block)
} else { } else {
debug!( debug!(
self.log, self.log,
@ -301,10 +303,63 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
}) })
.collect(); .collect();
roots.reverse(); debug!(
roots.dedup_by_key(|brs| brs.block_root); self.log,
"BlockBodiesRequest";
"peer" => format!("{:?}", peer_id),
"requested" => request.block_roots.len(),
"returned" => blocks.len(),
);
if roots.len() as u64 != req.count { self.network.send_rpc_response(
peer_id,
request_id,
RPCResponse::BeaconBlocks(blocks.as_ssz_bytes()),
)
}
/// Handle a `BeaconBlocks` request from the peer.
pub fn on_beacon_blocks_request(
&mut self,
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlocksRequest,
) {
debug!(
self.log,
"BeaconBlocksRequest";
"peer" => format!("{:?}", peer_id),
"count" => req.count,
"start_slot" => req.start_slot,
);
let mut blocks: Vec<BeaconBlock<T::EthSpec>> = self
.chain
.rev_iter_block_roots()
.filter(|(_root, slot)| {
req.start_slot <= slot.as_u64() && req.start_slot + req.count >= slot.as_u64()
})
.take_while(|(_root, slot)| req.start_slot <= slot.as_u64())
.filter_map(|(root, _slot)| {
if let Ok(Some(block)) = self.chain.store.get::<BeaconBlock<T::EthSpec>>(&root) {
Some(block)
} else {
debug!(
self.log,
"Peer requested unknown block";
"peer" => format!("{:?}", peer_id),
"request_root" => format!("{:}", root),
);
None
}
})
.collect();
blocks.reverse();
blocks.dedup_by_key(|brs| brs.slot);
if blocks.len() as u64 != req.count {
debug!( debug!(
self.log, self.log,
"BeaconBlocksRequest"; "BeaconBlocksRequest";
@ -313,33 +368,33 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"start_slot" => req.start_slot, "start_slot" => req.start_slot,
"current_slot" => self.chain.present_slot(), "current_slot" => self.chain.present_slot(),
"requested" => req.count, "requested" => req.count,
"returned" => roots.len(), "returned" => blocks.len(),
); );
} }
network.send_rpc_response( self.network.send_rpc_response(
peer_id, peer_id,
request_id, request_id,
RPCResponse::BeaconBlocks(blocks.as_ssz_bytes()), RPCResponse::BeaconBlocks(blocks.as_ssz_bytes()),
) )
} }
/// Handle a `BeaconBlocks` response from the peer. /// Handle a `BeaconBlocks` response from the peer.
pub fn on_beacon_blocks_response( pub fn on_beacon_blocks_response(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request_id: RequestId, request_id: RequestId,
res: Vec<BeaconBlock<T::EthSpec>>, beacon_blocks: Vec<BeaconBlock<T::EthSpec>>,
) { ) {
debug!( debug!(
self.log, self.log,
"BeaconBlocksResponse"; "BeaconBlocksResponse";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"count" => res.block_bodies.len(), "count" => beacon_blocks.len(),
); );
self.import_manager.beacon_blocks_response(peer_id, request_id, blocks); self.manager
.beacon_blocks_response(peer_id, request_id, beacon_blocks);
self.process_sync(); self.process_sync();
} }
@ -349,16 +404,17 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request_id: RequestId, request_id: RequestId,
res: Vec<BeaconBlock<T::EthSpec>>, beacon_blocks: Vec<BeaconBlock<T::EthSpec>>,
) { ) {
debug!( debug!(
self.log, self.log,
"BeaconBlocksResponse"; "BeaconBlocksResponse";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"count" => res.block_bodies.len(), "count" => beacon_blocks.len(),
); );
self.import_manager.recent_blocks_response(peer_id, request_id, blocks); self.manager
.recent_blocks_response(peer_id, request_id, beacon_blocks);
self.process_sync(); self.process_sync();
} }
@ -368,19 +424,13 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
/// Attempts to apply to block to the beacon chain. May queue the block for later processing. /// Attempts to apply to block to the beacon chain. May queue the block for later processing.
/// ///
/// Returns a `bool` which, if `true`, indicates we should forward the block to our peers. /// Returns a `bool` which, if `true`, indicates we should forward the block to our peers.
pub fn on_block_gossip( pub fn on_block_gossip(&mut self, peer_id: PeerId, block: BeaconBlock<T::EthSpec>) -> bool {
&mut self, if let Ok(outcome) = self.chain.process_block(block.clone()) {
peer_id: PeerId,
block: BeaconBlock<T::EthSpec>,
) -> bool {
if let Some(outcome) =
self.process_block(peer_id.clone(), block.clone(), network, &"gossip")
{
match outcome { match outcome {
BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK, BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK,
BlockProcessingOutcome::ParentUnknown { parent } => { BlockProcessingOutcome::ParentUnknown { parent: _ } => {
// Inform the sync manager to find parents for this block // Inform the sync manager to find parents for this block
self.import_manager.add_unknown_block(block.clone()); self.manager.add_unknown_block(block.clone(), peer_id);
SHOULD_FORWARD_GOSSIP_BLOCK SHOULD_FORWARD_GOSSIP_BLOCK
} }
BlockProcessingOutcome::FutureSlot { BlockProcessingOutcome::FutureSlot {
@ -401,12 +451,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
/// Process a gossip message declaring a new attestation. /// Process a gossip message declaring a new attestation.
/// ///
/// Not currently implemented. /// Not currently implemented.
pub fn on_attestation_gossip( pub fn on_attestation_gossip(&mut self, _peer_id: PeerId, msg: Attestation<T::EthSpec>) {
&mut self,
_peer_id: PeerId,
msg: Attestation<T::EthSpec>,
_network: &mut NetworkContext,
) {
match self.chain.process_attestation(msg) { match self.chain.process_attestation(msg) {
Ok(outcome) => info!( Ok(outcome) => info!(
self.log, self.log,
@ -420,39 +465,74 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
} }
} }
/*
/// Returns `true` if `self.chain` has not yet processed this block.
pub fn chain_has_seen_block(&self, block_root: &Hash256) -> bool {
!self
.chain
.is_new_block_root(&block_root)
.unwrap_or_else(|_| {
error!(self.log, "Unable to determine if block is new.");
false
})
}
*/
/// Generates our current state in the form of a HELLO RPC message. /// Generates our current state in the form of a HELLO RPC message.
pub fn generate_hello(&self) -> HelloMessage { pub fn generate_hello(&self) -> HelloMessage {
hello_message(&self.chain) hello_message(&self.chain)
} }
} }
/// Build a `HelloMessage` representing the state of the given `beacon_chain`. /// Build a `HelloMessage` representing the state of the given `beacon_chain`.
fn hello_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) -> HelloMessage { fn hello_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) -> HelloMessage {
let spec = &beacon_chain.spec;
let state = &beacon_chain.head().beacon_state; let state = &beacon_chain.head().beacon_state;
HelloMessage { HelloMessage {
network_id: spec.network_id, fork_version: state.fork.current_version,
//TODO: Correctly define the chain id finalized_root: state.finalized_checkpoint.root,
chain_id: spec.network_id as u64, finalized_epoch: state.finalized_checkpoint.epoch,
latest_finalized_root: state.finalized_checkpoint.root, head_root: beacon_chain.head().beacon_block_root,
latest_finalized_epoch: state.finalized_checkpoint.epoch, head_slot: state.slot,
best_root: beacon_chain.head().beacon_block_root, }
best_slot: state.slot, }
/// Wraps a Network Channel to employ various RPC/Sync related network functionality.
pub struct NetworkContext {
/// The network channel to relay messages to the Network service.
network_send: mpsc::UnboundedSender<NetworkMessage>,
/// Logger for the `NetworkContext`.
log: slog::Logger,
}
impl NetworkContext {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>, log: slog::Logger) -> Self {
Self { network_send, log }
}
pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
self.send_rpc_request(peer_id, RPCRequest::Goodbye(reason))
// TODO: disconnect peers.
}
pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) {
// Note: There is currently no use of keeping track of requests. However the functionality
// is left here for future revisions.
self.send_rpc_event(peer_id, RPCEvent::Request(0, rpc_request));
}
//TODO: Handle Error responses
pub fn send_rpc_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
rpc_response: RPCResponse,
) {
self.send_rpc_event(
peer_id,
RPCEvent::Response(request_id, RPCErrorResponse::Success(rpc_response)),
);
}
fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.send(peer_id, OutgoingMessage::RPC(rpc_event))
}
fn send(&mut self, peer_id: PeerId, outgoing_message: OutgoingMessage) {
self.network_send
.try_send(NetworkMessage::Send(peer_id, outgoing_message))
.unwrap_or_else(|_| {
warn!(
self.log,
"Could not send RPC message to the network service"
)
});
} }
} }