diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index de7cc7e92..c8ad15e2e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -260,6 +260,20 @@ impl BeaconChain { ReverseBlockRootIterator::new((head.beacon_block_root, head.beacon_block.slot), iter) } + pub fn forwards_iter_block_roots( + &self, + start_slot: Slot, + ) -> >::ForwardsBlockRootsIterator { + let local_head = self.head(); + T::Store::forwards_block_roots_iterator( + self.store.clone(), + start_slot, + local_head.beacon_state, + local_head.beacon_block_root, + &self.spec, + ) + } + /// Traverse backwards from `block_root` to find the block roots of its ancestors. /// /// ## Notes @@ -888,7 +902,7 @@ impl BeaconChain { // Only log a warning if our head is in a reasonable place to verify this attestation. // This avoids excess logging during syncing. if head_epoch + 1 >= attestation_epoch { - warn!( + debug!( self.log, "Dropped attestation for unknown block"; "block" => format!("{}", attestation.data.beacon_block_root) @@ -1334,23 +1348,6 @@ impl BeaconChain { metrics::stop_timer(fork_choice_register_timer); - let find_head_timer = - metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE_FIND_HEAD); - - // Execute the fork choice algorithm, enthroning a new head if discovered. - // - // Note: in the future we may choose to run fork-choice less often, potentially based upon - // some heuristic around number of attestations seen for the block. - if let Err(e) = self.fork_choice() { - error!( - self.log, - "fork choice failed to find head"; - "error" => format!("{:?}", e) - ) - }; - - metrics::stop_timer(find_head_timer); - metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); metrics::observe( &metrics::OPERATIONS_PER_BLOCK_ATTESTATION, diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index e5dd4af59..63c9e30ac 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -48,10 +48,6 @@ lazy_static! { "beacon_block_processing_fork_choice_register_seconds", "Time spent registering the new block with fork choice (but not finding head)" ); - pub static ref BLOCK_PROCESSING_FORK_CHOICE_FIND_HEAD: Result = try_create_histogram( - "beacon_block_processing_fork_choice_find_head_seconds", - "Time spent finding the new head after processing a new block" - ); /* * Block Production diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 0ec92aa28..737d7f9c4 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -258,6 +258,8 @@ where .process_block(block) .expect("should not error during block processing"); + self.chain.fork_choice().expect("should find head"); + if let BlockProcessingOutcome::Processed { block_root } = outcome { head_block_root = Some(block_root); diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index a06c652e3..a278d3f5b 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -494,6 +494,11 @@ fn run_skip_slot_test(skip_slots: u64) { }) ); + harness_b + .chain + .fork_choice() + .expect("should run fork choice"); + assert_eq!( harness_b.chain.head().beacon_block.slot, Slot::new(skip_slots + 1) diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 8883b5acf..16fa5df46 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -8,8 +8,8 @@ edition = "2018" hex = "0.3" # rust-libp2p is presently being sourced from a Sigma Prime fork of the # `libp2p/rust-libp2p` repository. -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "f9ac7d542dc3e0b65c5cbbe4f45bfc3382ab2b4d" } -enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "f9ac7d542dc3e0b65c5cbbe4f45bfc3382ab2b4d", features = ["serde"] } +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "3f9b030e29c9b31f9fe6f2ed27be4a813e2b3701" } +enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "3f9b030e29c9b31f9fe6f2ed27be4a813e2b3701", features = ["serde"] } types = { path = "../../eth2/types" } serde = "1.0.102" serde_derive = "1.0.102" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 0eb60387b..fbb129f5f 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -172,7 +172,7 @@ impl NetworkBehaviourEventProcess format!("{}", peer_id), + debug!(self.log, "Identified Peer"; "peer" => format!("{}", peer_id), "protocol_version" => info.protocol_version, "agent_version" => info.agent_version, "listening_ addresses" => format!("{:?}", info.listen_addrs), diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index fdbbca2cc..fc1653b7c 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -21,7 +21,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::timer::Delay; /// Maximum seconds before searching for extra peers. -const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 60; +const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 120; /// Initial delay between peer searches. const INITIAL_SEARCH_DELAY: u64 = 5; /// Local ENR storage filename. @@ -172,18 +172,6 @@ impl Discovery { let random_node = NodeId::random(); debug!(self.log, "Searching for peers"); self.discovery.find_node(random_node); - - // update the time until next discovery - let delay = { - if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES { - self.past_discovery_delay *= 2; - self.past_discovery_delay - } else { - MAX_TIME_BETWEEN_PEER_SEARCHES - } - }; - self.peer_discovery_delay - .reset(Instant::now() + Duration::from_secs(delay)); } } @@ -252,6 +240,10 @@ where if self.connected_peers.len() < self.max_peers { self.find_peers(); } + // Set to maximum, and update to earlier, once we get our results back. + self.peer_discovery_delay.reset( + Instant::now() + Duration::from_secs(MAX_TIME_BETWEEN_PEER_SEARCHES), + ); } Ok(Async::NotReady) => break, Err(e) => { @@ -283,6 +275,17 @@ where } Discv5Event::FindNodeResult { closer_peers, .. } => { debug!(self.log, "Discovery query completed"; "peers_found" => closer_peers.len()); + // update the time to the next query + if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES { + self.past_discovery_delay *= 2; + } + let delay = std::cmp::max( + self.past_discovery_delay, + MAX_TIME_BETWEEN_PEER_SEARCHES, + ); + self.peer_discovery_delay + .reset(Instant::now() + Duration::from_secs(delay)); + if closer_peers.is_empty() { debug!(self.log, "Discovery random query found no peers"); } diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 9b758f787..31a0ba3e1 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -8,11 +8,11 @@ use crate::rpc::protocol::{InboundFramed, OutboundFramed}; use core::marker::PhantomData; use fnv::FnvHashMap; use futures::prelude::*; -use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade}; +use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError}; use libp2p::swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; -use slog::{crit, debug, error, trace, warn}; +use slog::{crit, debug, error, trace}; use smallvec::SmallVec; use std::collections::hash_map::Entry; use std::time::{Duration, Instant}; @@ -25,6 +25,9 @@ use tokio::timer::{delay_queue, DelayQueue}; /// The time (in seconds) before a substream that is awaiting a response from the user times out. pub const RESPONSE_TIMEOUT: u64 = 10; +/// The number of times to retry an outbound upgrade in the case of IO errors. +const IO_ERROR_RETRIES: u8 = 3; + /// Inbound requests are given a sequential `RequestId` to keep track of. type InboundRequestId = RequestId; /// Outbound requests are associated with an id that is given by the application that sent the @@ -40,7 +43,7 @@ where listen_protocol: SubstreamProtocol, /// If `Some`, something bad happened and we should shut down the handler with an error. - pending_error: Option>, + pending_error: Option<(RequestId, ProtocolsHandlerUpgrErr)>, /// Queue of events to produce in `poll()`. events_out: SmallVec<[RPCEvent; 4]>, @@ -81,6 +84,10 @@ where /// After the given duration has elapsed, an inactive connection will shutdown. inactive_timeout: Duration, + /// Try to negotiate the outbound upgrade a few times if there is an IO error before reporting the request as failed. + /// This keeps track of the number of attempts. + outbound_io_error_retries: u8, + /// Logger for handling RPC streams log: slog::Logger, @@ -150,6 +157,7 @@ where max_dial_negotiated: 8, keep_alive: KeepAlive::Yes, inactive_timeout, + outbound_io_error_retries: 0, log: log.clone(), _phantom: PhantomData, } @@ -339,13 +347,29 @@ where fn inject_dial_upgrade_error( &mut self, - _: Self::OutboundOpenInfo, + request: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr< >::Error, >, ) { + if let ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error { + self.outbound_io_error_retries += 1; + if self.outbound_io_error_retries < IO_ERROR_RETRIES { + self.send_request(request); + return; + } + } + self.outbound_io_error_retries = 0; + // add the error + let request_id = { + if let RPCEvent::Request(id, _) = request { + id + } else { + 0 + } + }; if self.pending_error.is_none() { - self.pending_error = Some(error); + self.pending_error = Some((request_id, error)); } } @@ -359,11 +383,43 @@ where ProtocolsHandlerEvent, Self::Error, > { - if let Some(err) = self.pending_error.take() { - // Returning an error here will result in dropping any peer that doesn't support any of - // the RPC protocols. For our immediate purposes we permit this and simply log that an - // upgrade was not supported. - warn!(self.log,"RPC Protocol was not supported"; "Error" => format!("{}", err)); + if let Some((request_id, err)) = self.pending_error.take() { + // Returning an error here will result in dropping the peer. + match err { + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply( + RPCError::InvalidProtocol(protocol_string), + )) => { + // Peer does not support the protocol. + // TODO: We currently will not drop the peer, for maximal compatibility with + // other clients testing their software. In the future, we will need to decide + // which protocols are a bare minimum to support before kicking the peer. + error!(self.log, "Peer doesn't support the RPC protocol"; "protocol" => protocol_string); + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error(request_id, RPCError::InvalidProtocol(protocol_string)), + ))); + } + ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => { + // negotiation timeout, mark the request as failed + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error( + request_id, + RPCError::Custom("Protocol negotiation timeout".into()), + ), + ))); + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) => { + // IO/Decode/Custom Error, report to the application + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error(request_id, err), + ))); + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => { + // Error during negotiation + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error(request_id, RPCError::Custom(format!("{}", err))), + ))); + } + } } // return any events that need to be reported @@ -385,12 +441,19 @@ where } // purge expired outbound substreams - while let Async::Ready(Some(stream_id)) = self + if let Async::Ready(Some(stream_id)) = self .outbound_substreams_delay .poll() .map_err(|_| ProtocolsHandlerUpgrErr::Timer)? { self.outbound_substreams.remove(stream_id.get_ref()); + // notify the user + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error( + stream_id.get_ref().clone(), + RPCError::Custom("Stream timed out".into()), + ), + ))); } // drive inbound streams that need to be processed diff --git a/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs b/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs index affb7899b..646bd9c25 100644 --- a/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs +++ b/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs @@ -84,7 +84,7 @@ fn test_gossipsub_full_mesh_publish() { let log = common::build_log(Level::Info, false); let num_nodes = 20; - let mut nodes = common::build_full_mesh(log, num_nodes, None); + let mut nodes = common::build_full_mesh(log, num_nodes, Some(11320)); let mut publishing_node = nodes.pop().unwrap(); let pubsub_message = PubsubMessage::Block(vec![0; 4]); let publishing_topic: String = "/eth2/beacon_block/ssz".into(); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ac7961254..a8bbca172 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -231,33 +231,30 @@ fn network_service( } } + // poll the swarm + let mut peers_to_ban = Vec::new(); loop { - // poll the swarm - let mut locked_service = libp2p_service.lock(); - match locked_service.poll() { + match libp2p_service.lock().poll() { Ok(Async::Ready(Some(event))) => match event { Libp2pEvent::RPC(peer_id, rpc_event) => { trace!(log, "Received RPC"; "rpc" => format!("{}", rpc_event)); // if we received a Goodbye message, drop and ban the peer if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event { - locked_service.disconnect_and_ban_peer( - peer_id.clone(), - std::time::Duration::from_secs(BAN_PEER_TIMEOUT), - ); + peers_to_ban.push(peer_id.clone()); }; message_handler_send .try_send(HandlerMessage::RPC(peer_id, rpc_event)) .map_err(|_| "Failed to send RPC to handler")?; } Libp2pEvent::PeerDialed(peer_id) => { - debug!(log, "Peer Dialed"; "PeerID" => format!("{:?}", peer_id)); + debug!(log, "Peer Dialed"; "peer_id" => format!("{:?}", peer_id)); message_handler_send .try_send(HandlerMessage::PeerDialed(peer_id)) .map_err(|_| "Failed to send PeerDialed to handler")?; } Libp2pEvent::PeerDisconnected(peer_id) => { - debug!(log, "Peer Disconnected"; "PeerID" => format!("{:?}", peer_id)); + debug!(log, "Peer Disconnected"; "peer_id" => format!("{:?}", peer_id)); message_handler_send .try_send(HandlerMessage::PeerDisconnected(peer_id)) .map_err(|_| "Failed to send PeerDisconnected to handler")?; @@ -280,6 +277,14 @@ fn network_service( } } + // ban and disconnect any peers that sent Goodbye requests + while let Some(peer_id) = peers_to_ban.pop() { + libp2p_service.lock().disconnect_and_ban_peer( + peer_id.clone(), + std::time::Duration::from_secs(BAN_PEER_TIMEOUT), + ); + } + Ok(Async::NotReady) }) } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index a46453296..0bea170c2 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -58,30 +58,23 @@ //! if an attestation references an unknown block) this manager can search for the block and //! subsequently search for parents if needed. -use super::message_processor::{ - status_message, NetworkContext, PeerSyncInfo, FUTURE_SLOT_TOLERANCE, -}; +use super::message_processor::PeerSyncInfo; +use super::network_context::SyncNetworkContext; +use super::range_sync::RangeSync; +use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::{RPCRequest, RequestId}; +use eth2_libp2p::rpc::RequestId; use eth2_libp2p::PeerId; use fnv::FnvHashMap; use futures::prelude::*; -use slog::{crit, debug, info, trace, warn, Logger}; +use slog::{crit, debug, error, info, trace, warn, Logger}; use smallvec::SmallVec; -use std::collections::{HashMap, HashSet}; -use std::ops::{Add, Sub}; +use std::collections::HashSet; +use std::ops::Sub; use std::sync::Weak; use tokio::sync::{mpsc, oneshot}; -use types::{BeaconBlock, EthSpec, Hash256, Slot}; - -/// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch -/// is requested. There is a timeout for each batch request. If this value is too high, we will -/// downvote peers with poor bandwidth. This can be set arbitrarily high, in which case the -/// responder will fill the response up to the max request size, assuming they have the bandwidth -/// to do so. -//TODO: Make this dynamic based on peer's bandwidth -const BLOCKS_PER_REQUEST: u64 = 50; +use types::{BeaconBlock, EthSpec, Hash256}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -93,9 +86,6 @@ const PARENT_FAIL_TOLERANCE: usize = 3; /// canonical chain to its head once the peer connects. A chain should not appear where it's depth /// is further back than the most recent head slot. const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; -/// The number of empty batches we tolerate before dropping the peer. This prevents endless -/// requests to peers who never return blocks. -const EMPTY_BATCH_TOLERANCE: usize = 100; #[derive(Debug)] /// A message than can be sent to the sync manager thread. @@ -131,51 +121,6 @@ pub enum SyncMessage { RPCError(PeerId, RequestId), } -#[derive(PartialEq)] -/// The current state of a block or batches lookup. -enum BlockRequestsState { - /// The object is queued to be downloaded from a peer but has not yet been requested. - Queued, - - /// The batch or parent has been requested with the `RequestId` and we are awaiting a response. - Pending(RequestId), - - /// The downloaded blocks are ready to be processed by the beacon chain. For a batch process - /// this means we have found a common chain. - ReadyToProcess, - - /// The batch is complete, simply drop without downvoting the peer. - Complete, - - /// A failure has occurred and we will drop and downvote the peer that caused the request. - Failed, -} - -/// `BlockRequests` keep track of the long-range (batch) sync process per peer. -struct BlockRequests { - /// The peer's head slot and the target of this batch download. - target_head_slot: Slot, - - /// The peer's head root, used to specify which chain of blocks we are downloading from. - target_head_root: Hash256, - - /// The blocks that we have currently downloaded from the peer that are yet to be processed. - downloaded_blocks: Vec>, - - /// The number of blocks successfully processed in this request. - blocks_processed: usize, - - /// The number of empty batches we have consecutively received. If a peer returns more than - /// EMPTY_BATCHES_TOLERANCE, they are dropped. - consecutive_empty_batches: usize, - - /// The current state of this batch request. - state: BlockRequestsState, - - /// The current `start_slot` of the batched block request. - current_start_slot: Slot, -} - /// Maintains a sequential list of parents to lookup and the lookup's current state. struct ParentRequests { /// The blocks that have currently been downloaded. @@ -189,26 +134,8 @@ struct ParentRequests { /// downvoted. last_submitted_peer: PeerId, - /// The current state of the parent lookup. - state: BlockRequestsState, -} - -impl BlockRequests { - /// Gets the next start slot for a batch and transitions the state to a Queued state. - fn update_start_slot(&mut self) { - // the last request may not have returned all the required blocks (hit the rpc size - // limit). If so, start from the last returned slot - if !self.downloaded_blocks.is_empty() - && self.downloaded_blocks[self.downloaded_blocks.len() - 1].slot - > self.current_start_slot - { - self.current_start_slot = self.downloaded_blocks[self.downloaded_blocks.len() - 1].slot - + Slot::from(BLOCKS_PER_REQUEST); - } else { - self.current_start_slot += Slot::from(BLOCKS_PER_REQUEST); - } - self.state = BlockRequestsState::Queued; - } + /// The request ID of this lookup is in progress. + pending: Option, } #[derive(PartialEq, Debug, Clone)] @@ -242,11 +169,10 @@ pub struct SyncManager { input_channel: mpsc::UnboundedReceiver>, /// A network context to contact the network service. - network: NetworkContext, + network: SyncNetworkContext, - /// A collection of `BlockRequest` per peer that is currently being downloaded. Used in the - /// long-range (batch) sync process. - import_queue: HashMap>, + /// The object handling long-range batch load-balanced syncing. + range_sync: RangeSync, /// A collection of parent block lookups. parent_queue: SmallVec<[ParentRequests; 3]>, @@ -257,11 +183,6 @@ pub struct SyncManager { /// The collection of known, connected, fully-sync'd peers. full_peers: HashSet, - /// The current request id. This is used to keep track of responses to various outbound - /// requests. This is an internal accounting mechanism, request id's are never sent to any - /// peers. - current_req_id: usize, - /// The logger for the import manager. log: Logger, } @@ -272,7 +193,7 @@ pub struct SyncManager { pub fn spawn( executor: &tokio::runtime::TaskExecutor, beacon_chain: Weak>, - network: NetworkContext, + network_send: mpsc::UnboundedSender, log: slog::Logger, ) -> ( mpsc::UnboundedSender>, @@ -285,15 +206,14 @@ pub fn spawn( // create an instance of the SyncManager let sync_manager = SyncManager { - chain: beacon_chain, + chain: beacon_chain.clone(), state: ManagerState::Stalled, input_channel: sync_recv, - network, - import_queue: HashMap::new(), + network: SyncNetworkContext::new(network_send, log.clone()), + range_sync: RangeSync::new(beacon_chain, log.clone()), parent_queue: SmallVec::new(), single_block_lookups: FnvHashMap::default(), full_peers: HashSet::new(), - current_req_id: 0, log: log.clone(), }; @@ -316,7 +236,7 @@ impl SyncManager { /// A peer has connected which has blocks that are unknown to us. /// /// This function handles the logic associated with the connection of a new peer. If the peer - /// is sufficiently ahead of our current head, a long-range (batch) sync is started and + /// is sufficiently ahead of our current head, a range-sync (batch) sync is started and /// batches of blocks are queued to download from the peer. Batched blocks begin at our latest /// finalized head. /// @@ -336,7 +256,7 @@ impl SyncManager { let local = PeerSyncInfo::from(&chain); - // If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch sync, + // If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch/range sync, // consider it a fully-sync'd peer. if remote.head_slot.sub(local.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { trace!(self.log, "Ignoring full sync with peer"; @@ -344,9 +264,9 @@ impl SyncManager { "peer_head_slot" => remote.head_slot, "local_head_slot" => local.head_slot, ); - // remove the peer from the queue if it exists - self.import_queue.remove(&peer_id); self.add_full_peer(peer_id); + // notify the range sync that a peer has been added + self.range_sync.fully_synced_peer_found(); return; } @@ -363,107 +283,9 @@ impl SyncManager { return; } - // Check if we are already downloading blocks from this peer, if so update, if not set up - // a new request structure - if let Some(block_requests) = self.import_queue.get_mut(&peer_id) { - // update the target head slot - if remote.head_slot > block_requests.target_head_slot { - block_requests.target_head_slot = remote.head_slot; - } - } else { - // not already downloading blocks from this peer - let block_requests = BlockRequests { - target_head_slot: remote.head_slot, // this should be larger than the current head. It is checked before add_peer is called - target_head_root: remote.head_root, - consecutive_empty_batches: 0, - downloaded_blocks: Vec::new(), - blocks_processed: 0, - state: BlockRequestsState::Queued, - current_start_slot: local - .finalized_epoch - .start_slot(T::EthSpec::slots_per_epoch()), - }; - self.import_queue.insert(peer_id, block_requests); - } - } - - /// A `BlocksByRange` request has received a response. This function process the response. - fn blocks_by_range_response( - &mut self, - peer_id: PeerId, - request_id: RequestId, - block: Option>, - ) { - // find the request associated with this response - let block_requests = match self - .import_queue - .get_mut(&peer_id) - .filter(|r| r.state == BlockRequestsState::Pending(request_id)) - { - Some(req) => req, - _ => { - // No pending request, invalid request_id or coding error - warn!(self.log, "BlocksByRange response unknown"; "request_id" => request_id); - return; - } - }; - - // add the downloaded block - if let Some(downloaded_block) = block { - // add the block to the request - block_requests.downloaded_blocks.push(downloaded_block); - return; - } - // the batch has finished processing, or terminated early - - // TODO: The following requirement may need to be relaxed as a node could fork and prune - // their old head, given to us during a STATUS. - // If we are syncing up to a target head block, at least the target head block should be - // returned. - let blocks = &block_requests.downloaded_blocks; - if blocks.is_empty() { - debug!(self.log, "BlocksByRange response was empty"; "request_id" => request_id); - block_requests.consecutive_empty_batches += 1; - if block_requests.consecutive_empty_batches >= EMPTY_BATCH_TOLERANCE { - warn!(self.log, "Peer returned too many empty block batches"; - "peer" => format!("{:?}", peer_id)); - block_requests.state = BlockRequestsState::Failed; - } else if block_requests.current_start_slot + BLOCKS_PER_REQUEST - >= block_requests.target_head_slot - { - warn!(self.log, "Peer did not return blocks it claimed to possess"; - "peer" => format!("{:?}", peer_id)); - // This could be due to a re-org causing the peer to prune their head. In this - // instance, we try to process what is currently downloaded, if there are blocks - // downloaded. - block_requests.state = BlockRequestsState::Complete; - } else { - // this batch was empty, request the next batch - block_requests.update_start_slot(); - } - return; - } - - block_requests.consecutive_empty_batches = 0; - - // verify the range of received blocks - // Note that the order of blocks is verified in block processing - let last_sent_slot = blocks[blocks.len() - 1].slot; - if block_requests.current_start_slot > blocks[0].slot - || block_requests.current_start_slot.add(BLOCKS_PER_REQUEST) < last_sent_slot - { - warn!(self.log, "BlocksByRange response returned out of range blocks"; - "request_id" => request_id, - "response_initial_slot" => blocks[0].slot, - "requested_initial_slot" => block_requests.current_start_slot); - downvote_peer(&mut self.network, &self.log, peer_id); - // consider this sync failed - block_requests.state = BlockRequestsState::Failed; - return; - } - - // Process this batch - block_requests.state = BlockRequestsState::ReadyToProcess; + // Add the peer to our RangeSync + self.range_sync.add_peer(&mut self.network, peer_id, remote); + self.update_state(); } /// The response to a `BlocksByRoot` request. @@ -492,12 +314,12 @@ impl SyncManager { // this should be a response to a parent request search // find the request - let parent_request = match self + let mut parent_request = match self .parent_queue - .iter_mut() - .find(|request| request.state == BlockRequestsState::Pending(request_id)) + .iter() + .position(|request| request.pending == Some(request_id)) { - Some(req) => req, + Some(pos) => self.parent_queue.remove(pos), None => { if block.is_some() { // No pending request, invalid request_id or coding error @@ -507,19 +329,19 @@ impl SyncManager { return; } }; + match block { Some(block) => { // add the block to response parent_request.downloaded_blocks.push(block); - // queue for processing - parent_request.state = BlockRequestsState::ReadyToProcess; + self.process_parent_request(parent_request); } None => { // if an empty response is given, the peer didn't have the requested block, try again parent_request.failed_attempts += 1; - parent_request.state = BlockRequestsState::Queued; parent_request.last_submitted_peer = peer_id; + self.request_parent(parent_request); } } } @@ -536,7 +358,7 @@ impl SyncManager { // verify the hash is correct and try and process the block if expected_block_hash != block.canonical_root() { // the peer that sent this, sent us the wrong block - downvote_peer(&mut self.network, &self.log, peer_id); + self.network.downvote_peer(peer_id); return; } @@ -547,6 +369,20 @@ impl SyncManager { match outcome { BlockProcessingOutcome::Processed { block_root } => { info!(self.log, "Processed block"; "block" => format!("{}", block_root)); + + match chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "location" => "single block" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "single block" + ), + } } BlockProcessingOutcome::ParentUnknown { .. } => { // We don't know of the blocks parent, begin a parent lookup search @@ -557,7 +393,7 @@ impl SyncManager { } _ => { warn!(self.log, "Single block lookup failed"; "outcome" => format!("{:?}", outcome)); - downvote_peer(&mut self.network, &self.log, peer_id); + self.network.downvote_peer(peer_id); } } } @@ -571,12 +407,12 @@ impl SyncManager { /// A block has been sent to us that has an unknown parent. This begins a parent lookup search /// to find the parent or chain of parents that match our current chain. fn add_unknown_block(&mut self, peer_id: PeerId, block: BeaconBlock) { - // if we are not in regular sync mode, ignore this block + // If we are not in regular sync mode, ignore this block if self.state != ManagerState::Regular { return; } - // make sure this block is not already being searched for + // Make sure this block is not already being searched for // TODO: Potentially store a hashset of blocks for O(1) lookups for parent_req in self.parent_queue.iter() { if parent_req @@ -589,26 +425,31 @@ impl SyncManager { } } - let req = ParentRequests { + let parent_request = ParentRequests { downloaded_blocks: vec![block], failed_attempts: 0, last_submitted_peer: peer_id, - state: BlockRequestsState::Queued, + pending: None, }; - self.parent_queue.push(req); + self.request_parent(parent_request) } /// A request to search for a block hash has been received. This function begins a BlocksByRoot /// request to find the requested block. fn search_for_block(&mut self, peer_id: PeerId, block_hash: Hash256) { - let request_id = self.current_req_id; - self.single_block_lookups.insert(request_id, block_hash); - self.current_req_id += 1; + // If we are not in regular sync mode, ignore this block + if self.state != ManagerState::Regular { + return; + } + let request = BlocksByRootRequest { block_roots: vec![block_hash], }; - blocks_by_root_request(&mut self.network, &self.log, peer_id, request_id, request); + + if let Ok(request_id) = self.network.blocks_by_root_request(peer_id, request) { + self.single_block_lookups.insert(request_id, block_hash); + } } fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId) { @@ -616,40 +457,25 @@ impl SyncManager { // remove any single block lookups self.single_block_lookups.remove(&request_id); - // find the request associated with this response - if let Some(block_requests) = self - .import_queue - .get_mut(&peer_id) - .filter(|r| r.state == BlockRequestsState::Pending(request_id)) - { - // TODO: Potentially implement a tolerance. For now, we try to process what have been - // downloaded - if !block_requests.downloaded_blocks.is_empty() { - block_requests.current_start_slot = block_requests - .downloaded_blocks - .last() - .expect("is not empty") - .slot; - block_requests.state = BlockRequestsState::ReadyToProcess; - } else { - block_requests.state = BlockRequestsState::Failed; - } - }; + // notify the range sync + self.range_sync + .inject_error(&mut self.network, peer_id.clone(), request_id); // increment the failure of a parent lookup if the request matches a parent search - if let Some(parent_req) = self + if let Some(pos) = self .parent_queue - .iter_mut() - .find(|request| request.state == BlockRequestsState::Pending(request_id)) + .iter() + .position(|request| request.pending == Some(request_id)) { - parent_req.failed_attempts += 1; - parent_req.state = BlockRequestsState::Queued; - parent_req.last_submitted_peer = peer_id; + let mut parent_request = self.parent_queue.remove(pos); + parent_request.failed_attempts += 1; + parent_request.last_submitted_peer = peer_id; + self.request_parent(parent_request); } } fn peer_disconnect(&mut self, peer_id: &PeerId) { - self.import_queue.remove(peer_id); + self.range_sync.peer_disconnect(&mut self.network, peer_id); self.full_peers.remove(peer_id); self.update_state(); } @@ -666,10 +492,11 @@ impl SyncManager { // These functions are called in the main poll function to transition the state of the sync // manager + /// Updates the syncing state of the `SyncManager`. fn update_state(&mut self) { let previous_state = self.state.clone(); self.state = { - if !self.import_queue.is_empty() { + if self.range_sync.is_syncing() { ManagerState::Syncing } else if !self.full_peers.is_empty() { ManagerState::Regular @@ -685,468 +512,144 @@ impl SyncManager { } } - fn process_potential_block_requests(&mut self) { - // check if an outbound request is required + fn process_parent_request(&mut self, mut parent_request: ParentRequests) { + // verify the last added block is the parent of the last requested block - // Managing a fixed number of outbound requests is maintained at the RPC protocol libp2p - // layer and not needed here. Therefore we create many outbound requests and let the RPC - // handle the number of simultaneous requests. - // Request all queued objects. - - // remove any failed batches - let debug_log = &self.log; - let full_peer_ref = &mut self.full_peers; - self.import_queue.retain(|peer_id, block_request| { - match block_request.state { - BlockRequestsState::Failed => { - debug!(debug_log, "Block import from peer failed"; - "peer_id" => format!("{:?}", peer_id), - "downloaded_blocks" => block_request.blocks_processed - ); - full_peer_ref.remove(peer_id); - false - } - BlockRequestsState::Complete => { - debug!(debug_log, "Block import from peer completed"; - "peer_id" => format!("{:?}", peer_id), - ); - false - } - _ => true, // keep all other states - } - }); - - // process queued block requests - for (peer_id, block_requests) in self.import_queue.iter_mut() { - if block_requests.state == BlockRequestsState::Queued { - let request_id = self.current_req_id; - block_requests.state = BlockRequestsState::Pending(request_id); - self.current_req_id += 1; - - let request = BlocksByRangeRequest { - head_block_root: block_requests.target_head_root, - start_slot: block_requests.current_start_slot.as_u64(), - count: BLOCKS_PER_REQUEST, - step: 0, - }; - blocks_by_range_request( - &mut self.network, - &self.log, - peer_id.clone(), - request_id, - request, - ); - } + if parent_request.downloaded_blocks.len() < 2 { + crit!( + self.log, + "There must be at least two blocks in a parent request lookup at all times" + ); + panic!("There must be at least two blocks in parent request lookup at all time"); + // fail loudly } - } + let previous_index = parent_request.downloaded_blocks.len() - 2; + let expected_hash = parent_request.downloaded_blocks[previous_index].parent_root; - fn process_complete_batches(&mut self) -> bool { - // This function can queue extra blocks and the main poll loop will need to be re-executed - // to process these. This flag indicates that the main poll loop has to continue. - let mut re_run_poll = false; + // Note: the length must be greater than 2 so this cannot panic. + let block_hash = parent_request + .downloaded_blocks + .last() + .expect("Complete batch cannot be empty") + .canonical_root(); + if block_hash != expected_hash { + // The sent block is not the correct block, remove the head block and downvote + // the peer + let _ = parent_request.downloaded_blocks.pop(); + let peer = parent_request.last_submitted_peer.clone(); - // create reference variables to be moved into subsequent closure - let chain_ref = self.chain.clone(); - let log_ref = &self.log; - let network_ref = &mut self.network; + debug!(self.log, "Peer sent invalid parent."; + "peer_id" => format!("{:?}",peer), + "received_block" => format!("{}", block_hash), + "expected_parent" => format!("{}", expected_hash), + ); - self.import_queue.retain(|peer_id, block_requests| { - if block_requests.state == BlockRequestsState::ReadyToProcess { - let downloaded_blocks = - std::mem::replace(&mut block_requests.downloaded_blocks, Vec::new()); - let end_slot = downloaded_blocks - .last() - .expect("Batches to be processed should not be empty") - .slot; - let total_blocks = downloaded_blocks.len(); - let start_slot = downloaded_blocks[0].slot; - - match process_blocks(chain_ref.clone(), downloaded_blocks, log_ref) { - Ok(()) => { - debug!(log_ref, "Blocks processed successfully"; - "peer" => format!("{:?}", peer_id), - "start_slot" => start_slot, - "end_slot" => end_slot, - "no_blocks" => total_blocks, - ); - block_requests.blocks_processed += total_blocks; - - // check if the batch is complete, by verifying if we have reached the - // target head - if end_slot >= block_requests.target_head_slot { - // Completed, re-status the peer to ensure we are up to the latest head - status_peer(network_ref, log_ref, chain_ref.clone(), peer_id.clone()); - // remove the request - false - } else { - // have not reached the end, queue another batch - block_requests.update_start_slot(); - re_run_poll = true; - // keep the batch - true - } - } - Err(e) => { - warn!(log_ref, "Block processing failed"; - "peer" => format!("{:?}", peer_id), - "start_slot" => start_slot, - "end_slot" => end_slot, - "no_blocks" => total_blocks, - "error" => format!("{:?}", e), - ); - downvote_peer(network_ref, log_ref, peer_id.clone()); - false - } - } - } else { - // not ready to process - true - } - }); - - re_run_poll - } - - fn process_parent_requests(&mut self) { - // check to make sure there are peers to search for the parent from - if self.full_peers.is_empty() { - return; - } - - // remove any failed requests - let debug_log = &self.log; - self.parent_queue.retain(|parent_request| { - if parent_request.state == BlockRequestsState::Failed { - debug!(debug_log, "Parent import failed"; - "block" => format!("{:?}",parent_request.downloaded_blocks[0].canonical_root()), - "ancestors_found" => parent_request.downloaded_blocks.len() - ); - false - } else { - true - } - }); - - // check if parents need to be searched for - for parent_request in self.parent_queue.iter_mut() { - if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE { - parent_request.state = BlockRequestsState::Failed; - continue; - } else if parent_request.state == BlockRequestsState::Queued { - // check the depth isn't too large - if parent_request.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE { - parent_request.state = BlockRequestsState::Failed; - continue; - } - - let request_id = self.current_req_id; - parent_request.state = BlockRequestsState::Pending(request_id); - self.current_req_id += 1; - let last_element_index = parent_request.downloaded_blocks.len() - 1; - let parent_hash = parent_request.downloaded_blocks[last_element_index].parent_root; - let request = BlocksByRootRequest { - block_roots: vec![parent_hash], - }; - - // select a random fully synced peer to attempt to download the parent block - let peer_id = self.full_peers.iter().next().expect("List is not empty"); - - blocks_by_root_request( - &mut self.network, - &self.log, - peer_id.clone(), - request_id, - request, - ); - } - } - } - - fn process_complete_parent_requests(&mut self) -> bool { - // returned value indicating whether the manager can be switched to idle or not - let mut re_run_poll = false; - - // Find any parent_requests ready to be processed - for completed_request in self - .parent_queue - .iter_mut() - .filter(|req| req.state == BlockRequestsState::ReadyToProcess) - { - // verify the last added block is the parent of the last requested block - - if completed_request.downloaded_blocks.len() < 2 { - crit!( - self.log, - "There must be at least two blocks in a parent request lookup at all times" - ); - panic!("There must be at least two blocks in parent request lookup at all time"); - // fail loudly - } - let previous_index = completed_request.downloaded_blocks.len() - 2; - let expected_hash = completed_request.downloaded_blocks[previous_index].parent_root; - // Note: the length must be greater than 2 so this cannot panic. - let block_hash = completed_request - .downloaded_blocks - .last() - .expect("Complete batch cannot be empty") - .canonical_root(); - if block_hash != expected_hash { - // remove the head block - let _ = completed_request.downloaded_blocks.pop(); - completed_request.state = BlockRequestsState::Queued; - let peer = completed_request.last_submitted_peer.clone(); - debug!(self.log, "Peer sent invalid parent."; - "peer_id" => format!("{:?}",peer), - "received_block" => format!("{}", block_hash), - "expected_parent" => format!("{}", expected_hash), - ); - re_run_poll = true; - downvote_peer(&mut self.network, &self.log, peer); - } + self.request_parent(parent_request); + self.network.downvote_peer(peer); + } else { + let mut successes = 0; // try and process the list of blocks up to the requested block - while !completed_request.downloaded_blocks.is_empty() { - let block = completed_request - .downloaded_blocks - .pop() - .expect("Block must exist exist"); - + while let Some(block) = parent_request.downloaded_blocks.pop() { // check if the chain exists if let Some(chain) = self.chain.upgrade() { match chain.process_block(block.clone()) { Ok(BlockProcessingOutcome::ParentUnknown { .. }) => { // need to keep looking for parents - completed_request.downloaded_blocks.push(block); - completed_request.state = BlockRequestsState::Queued; - re_run_poll = true; + parent_request.downloaded_blocks.push(block); + self.request_parent(parent_request); break; } - Ok(BlockProcessingOutcome::Processed { .. }) - | Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {} + Ok(BlockProcessingOutcome::Processed { .. }) => successes += 1, + Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {} Ok(outcome) => { // it's a future slot or an invalid block, remove it and try again - completed_request.failed_attempts += 1; - trace!( + parent_request.failed_attempts += 1; + debug!( self.log, "Invalid parent block"; "outcome" => format!("{:?}", outcome), - "peer" => format!("{:?}", completed_request.last_submitted_peer), + "peer" => format!("{:?}", parent_request.last_submitted_peer), ); - completed_request.state = BlockRequestsState::Queued; - re_run_poll = true; - downvote_peer( - &mut self.network, - &self.log, - completed_request.last_submitted_peer.clone(), - ); - return re_run_poll; + self.network + .downvote_peer(parent_request.last_submitted_peer.clone()); + self.request_parent(parent_request); + break; } Err(e) => { - completed_request.failed_attempts += 1; + parent_request.failed_attempts += 1; warn!( self.log, "Parent processing error"; "error" => format!("{:?}", e) ); - completed_request.state = BlockRequestsState::Queued; - re_run_poll = true; - downvote_peer( - &mut self.network, - &self.log, - completed_request.last_submitted_peer.clone(), - ); - return re_run_poll; + self.network + .downvote_peer(parent_request.last_submitted_peer.clone()); + self.request_parent(parent_request); + break; } } } else { - // chain doesn't exist - clear the event queue and return - return false; + break; + } + } + + if successes > 0 { + if let Some(chain) = self.chain.upgrade() { + match chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "block_imports" => successes, + "location" => "parent request" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "parent request" + ), + }; } } } - - // remove any fully processed parent chains - self.parent_queue - .retain(|req| req.state != BlockRequestsState::ReadyToProcess); - re_run_poll } -} -/* Network Context Helper Functions */ + fn request_parent(&mut self, mut parent_request: ParentRequests) { + // check to make sure there are peers to search for the parent from + if self.full_peers.is_empty() { + return; + } -fn status_peer( - network: &mut NetworkContext, - log: &slog::Logger, - chain: Weak>, - peer_id: PeerId, -) { - trace!( - log, - "Sending Status Request"; - "method" => "STATUS", - "peer" => format!("{:?}", peer_id) - ); - if let Some(chain) = chain.upgrade() { - network.send_rpc_request(None, peer_id, RPCRequest::Status(status_message(&chain))); - } -} + // check to make sure this request hasn't failed + if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE + || parent_request.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE + { + debug!(self.log, "Parent import failed"; + "block" => format!("{:?}",parent_request.downloaded_blocks[0].canonical_root()), + "ancestors_found" => parent_request.downloaded_blocks.len() + ); + return; // drop the request + } -fn blocks_by_range_request( - network: &mut NetworkContext, - log: &slog::Logger, - peer_id: PeerId, - request_id: RequestId, - request: BlocksByRangeRequest, -) { - trace!( - log, - "Sending BlocksByRange Request"; - "method" => "BlocksByRange", - "id" => request_id, - "count" => request.count, - "peer" => format!("{:?}", peer_id) - ); - network.send_rpc_request( - Some(request_id), - peer_id.clone(), - RPCRequest::BlocksByRange(request), - ); -} + let parent_hash = parent_request + .downloaded_blocks + .last() + .expect("The parent queue should never be empty") + .parent_root; + let request = BlocksByRootRequest { + block_roots: vec![parent_hash], + }; + // select a random fully synced peer to attempt to download the parent block + let peer_id = self.full_peers.iter().next().expect("List is not empty"); -fn blocks_by_root_request( - network: &mut NetworkContext, - log: &slog::Logger, - peer_id: PeerId, - request_id: RequestId, - request: BlocksByRootRequest, -) { - trace!( - log, - "Sending BlocksByRoot Request"; - "method" => "BlocksByRoot", - "count" => request.block_roots.len(), - "peer" => format!("{:?}", peer_id) - ); - network.send_rpc_request( - Some(request_id), - peer_id.clone(), - RPCRequest::BlocksByRoot(request), - ); -} - -fn downvote_peer(network: &mut NetworkContext, log: &slog::Logger, peer_id: PeerId) { - trace!( - log, - "Peer downvoted"; - "peer" => format!("{:?}", peer_id) - ); - // TODO: Implement reputation - network.disconnect(peer_id.clone(), GoodbyeReason::Fault); -} - -// Helper function to process blocks which only consumes the chain and blocks to process -fn process_blocks( - weak_chain: Weak>, - blocks: Vec>, - log: &Logger, -) -> Result<(), String> { - for block in blocks { - if let Some(chain) = weak_chain.upgrade() { - let processing_result = chain.process_block(block.clone()); - - if let Ok(outcome) = processing_result { - match outcome { - BlockProcessingOutcome::Processed { block_root } => { - // The block was valid and we processed it successfully. - trace!( - log, "Imported block from network"; - "slot" => block.slot, - "block_root" => format!("{}", block_root), - ); - } - BlockProcessingOutcome::ParentUnknown { parent } => { - // blocks should be sequential and all parents should exist - trace!( - log, "Parent block is unknown"; - "parent_root" => format!("{}", parent), - "baby_block_slot" => block.slot, - ); - return Err(format!( - "Block at slot {} has an unknown parent.", - block.slot - )); - } - BlockProcessingOutcome::BlockIsAlreadyKnown => { - // this block is already known to us, move to the next - debug!( - log, "Imported a block that is already known"; - "block_slot" => block.slot, - ); - } - BlockProcessingOutcome::FutureSlot { - present_slot, - block_slot, - } => { - if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { - // The block is too far in the future, drop it. - trace!( - log, "Block is ahead of our slot clock"; - "msg" => "block for future slot rejected, check your time", - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - ); - return Err(format!( - "Block at slot {} is too far in the future", - block.slot - )); - } else { - // The block is in the future, but not too far. - trace!( - log, "Block is slightly ahead of our slot clock, ignoring."; - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - ); - } - } - BlockProcessingOutcome::WouldRevertFinalizedSlot { .. } => { - trace!( - log, "Finalized or earlier block processed"; - "outcome" => format!("{:?}", outcome), - ); - // block reached our finalized slot or was earlier, move to the next block - } - BlockProcessingOutcome::GenesisBlock => { - trace!( - log, "Genesis block was processed"; - "outcome" => format!("{:?}", outcome), - ); - } - _ => { - warn!( - log, "Invalid block received"; - "msg" => "peer sent invalid block", - "outcome" => format!("{:?}", outcome), - ); - return Err(format!("Invalid block at slot {}", block.slot)); - } - } - } else { - warn!( - log, "BlockProcessingFailure"; - "msg" => "unexpected condition in processing block.", - "outcome" => format!("{:?}", processing_result) - ); - return Err(format!( - "Unexpected block processing error: {:?}", - processing_result - )); - } - } else { - return Ok(()); // terminate early due to dropped beacon chain + if let Ok(request_id) = self + .network + .blocks_by_root_request(peer_id.clone(), request) + { + // if the request was successful add the queue back into self + parent_request.pending = Some(request_id); + self.parent_queue.push(parent_request); } } - - Ok(()) } impl Future for SyncManager { @@ -1166,7 +669,8 @@ impl Future for SyncManager { request_id, beacon_block, } => { - self.blocks_by_range_response( + self.range_sync.blocks_by_range_response( + &mut self.network, peer_id, request_id, beacon_block.map(|b| *b), @@ -1202,39 +706,6 @@ impl Future for SyncManager { } } - loop { - //TODO: Optimize the lookups. Potentially keep state of whether each of these functions - //need to be called. - let mut re_run = false; - - // only process batch requests if there are any - if !self.import_queue.is_empty() { - // process potential block requests - self.process_potential_block_requests(); - - // process any complete long-range batches - re_run = re_run || self.process_complete_batches(); - } - - // only process parent objects if we are in regular sync - if !self.parent_queue.is_empty() { - // process any parent block lookup-requests - self.process_parent_requests(); - - // process any complete parent lookups - re_run = re_run || self.process_complete_parent_requests(); - } - - // Shutdown the thread if the chain has termined - if self.chain.upgrade().is_none() { - return Ok(Async::Ready(())); - } - - if !re_run { - break; - } - } - // update the state of the manager self.update_state(); diff --git a/beacon_node/network/src/sync/message_processor.rs b/beacon_node/network/src/sync/message_processor.rs index c683beff3..f0c4ced58 100644 --- a/beacon_node/network/src/sync/message_processor.rs +++ b/beacon_node/network/src/sync/message_processor.rs @@ -6,7 +6,7 @@ use beacon_chain::{ use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; -use slog::{debug, info, o, trace, warn}; +use slog::{debug, error, o, trace, warn}; use ssz::Encode; use std::sync::Arc; use store::Store; @@ -60,8 +60,8 @@ pub struct MessageProcessor { sync_send: mpsc::UnboundedSender>, /// A oneshot channel for destroying the sync thread. _sync_exit: oneshot::Sender<()>, - /// A nextwork context to return and handle RPC requests. - network: NetworkContext, + /// A network context to return and handle RPC requests. + network: HandlerNetworkContext, /// The `RPCHandler` logger. log: slog::Logger, } @@ -75,13 +75,12 @@ impl MessageProcessor { log: &slog::Logger, ) -> Self { let sync_logger = log.new(o!("service"=> "sync")); - let sync_network_context = NetworkContext::new(network_send.clone(), sync_logger.clone()); // spawn the sync thread let (sync_send, _sync_exit) = super::manager::spawn( executor, Arc::downgrade(&beacon_chain), - sync_network_context, + network_send.clone(), sync_logger, ); @@ -89,7 +88,7 @@ impl MessageProcessor { chain: beacon_chain, sync_send, _sync_exit, - network: NetworkContext::new(network_send, log.clone()), + network: HandlerNetworkContext::new(network_send, log.clone()), log: log.clone(), } } @@ -120,11 +119,8 @@ impl MessageProcessor { /// /// Sends a `Status` message to the peer. pub fn on_connect(&mut self, peer_id: PeerId) { - self.network.send_rpc_request( - None, - peer_id, - RPCRequest::Status(status_message(&self.chain)), - ); + self.network + .send_rpc_request(peer_id, RPCRequest::Status(status_message(&self.chain))); } /// Handle a `Status` request. @@ -316,51 +312,47 @@ impl MessageProcessor { "start_slot" => req.start_slot, ); - //TODO: Optimize this - // Currently for skipped slots, the blocks returned could be less than the requested range. - // In the current implementation we read from the db then filter out out-of-range blocks. - // Improving the db schema to prevent this would be ideal. - - //TODO: This really needs to be read forward for infinite streams - // We should be reading the first block from the db, sending, then reading the next... we - // need a forwards iterator!! - - let mut blocks: Vec> = self + let mut block_roots = self .chain - .rev_iter_block_roots() - .filter(|(_root, slot)| { - req.start_slot <= slot.as_u64() && req.start_slot + req.count > slot.as_u64() - }) - .take_while(|(_root, slot)| req.start_slot <= slot.as_u64()) - .filter_map(|(root, _slot)| { - if let Ok(Some(block)) = self.chain.store.get::>(&root) { - Some(block) - } else { - warn!( - self.log, - "Block in the chain is not in the store"; - "request_root" => format!("{:}", root), + .forwards_iter_block_roots(Slot::from(req.start_slot)) + .take_while(|(_root, slot)| slot.as_u64() < req.start_slot + req.count) + .map(|(root, _slot)| root) + .collect::>(); + + block_roots.dedup(); + + let mut blocks_sent = 0; + for root in block_roots { + if let Ok(Some(block)) = self.chain.store.get::>(&root) { + // Due to skip slots, blocks could be out of the range, we ensure they are in the + // range before sending + if block.slot >= req.start_slot && block.slot < req.start_slot + req.count { + blocks_sent += 1; + self.network.send_rpc_response( + peer_id.clone(), + request_id, + RPCResponse::BlocksByRange(block.as_ssz_bytes()), ); - None } - }) - .filter(|block| block.slot >= req.start_slot) - .collect(); + } else { + error!( + self.log, + "Block in the chain is not in the store"; + "request_root" => format!("{:}", root), + ); + } + } - blocks.reverse(); - blocks.dedup_by_key(|brs| brs.slot); - - if blocks.len() < (req.count as usize) { - debug!( + if blocks_sent < (req.count as usize) { + trace!( self.log, - "Sending BlocksByRange Response"; + "BlocksByRange Response Sent"; "peer" => format!("{:?}", peer_id), "msg" => "Failed to return all requested blocks", "start_slot" => req.start_slot, "current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(), "requested" => req.count, - "returned" => blocks.len(), - ); + "returned" => blocks_sent); } else { trace!( self.log, @@ -369,17 +361,9 @@ impl MessageProcessor { "start_slot" => req.start_slot, "current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(), "requested" => req.count, - "returned" => blocks.len(), - ); + "returned" => blocks_sent); } - for block in blocks { - self.network.send_rpc_response( - peer_id.clone(), - request_id, - RPCResponse::BlocksByRange(block.as_ssz_bytes()), - ); - } // send the stream terminator self.network.send_rpc_error_response( peer_id, @@ -442,6 +426,27 @@ impl MessageProcessor { BlockProcessingOutcome::Processed { .. } => { trace!(self.log, "Gossipsub block processed"; "peer_id" => format!("{:?}",peer_id)); + + // TODO: It would be better if we can run this _after_ we publish the block to + // reduce block propagation latency. + // + // The `MessageHandler` would be the place to put this, however it doesn't seem + // to have a reference to the `BeaconChain`. I will leave this for future + // works. + match self.chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "location" => "block gossip" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "block gossip" + ), + } + SHOULD_FORWARD_GOSSIP_BLOCK } BlockProcessingOutcome::ParentUnknown { .. } => { @@ -494,7 +499,7 @@ impl MessageProcessor { match self.chain.process_attestation(msg.clone()) { Ok(outcome) => match outcome { AttestationProcessingOutcome::Processed => { - info!( + debug!( self.log, "Processed attestation"; "source" => "gossip", @@ -545,15 +550,17 @@ pub(crate) fn status_message(beacon_chain: &BeaconChain) } } -/// Wraps a Network Channel to employ various RPC/Sync related network functionality. -pub struct NetworkContext { +/// Wraps a Network Channel to employ various RPC related network functionality for the message +/// handler. The handler doesn't manage it's own request Id's and can therefore only send +/// responses or requests with 0 request Ids. +pub struct HandlerNetworkContext { /// The network channel to relay messages to the Network service. network_send: mpsc::UnboundedSender, /// Logger for the `NetworkContext`. log: slog::Logger, } -impl NetworkContext { +impl HandlerNetworkContext { pub fn new(network_send: mpsc::UnboundedSender, log: slog::Logger) -> Self { Self { network_send, log } } @@ -565,7 +572,7 @@ impl NetworkContext { "reason" => format!("{:?}", reason), "peer_id" => format!("{:?}", peer_id), ); - self.send_rpc_request(None, peer_id.clone(), RPCRequest::Goodbye(reason)); + self.send_rpc_request(peer_id.clone(), RPCRequest::Goodbye(reason)); self.network_send .try_send(NetworkMessage::Disconnect { peer_id }) .unwrap_or_else(|_| { @@ -576,14 +583,10 @@ impl NetworkContext { }); } - pub fn send_rpc_request( - &mut self, - request_id: Option, - peer_id: PeerId, - rpc_request: RPCRequest, - ) { - // use 0 as the default request id, when an ID is not required. - let request_id = request_id.unwrap_or_else(|| 0); + pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) { + // the message handler cannot send requests with ids. Id's are managed by the sync + // manager. + let request_id = 0; self.send_rpc_event(peer_id, RPCEvent::Request(request_id, rpc_request)); } diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index b8f575075..372511f6a 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -3,6 +3,8 @@ //! Stores the various syncing methods for the beacon chain. mod manager; mod message_processor; +mod network_context; +mod range_sync; pub use message_processor::MessageProcessor; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs new file mode 100644 index 000000000..5a491eb2f --- /dev/null +++ b/beacon_node/network/src/sync/network_context.rs @@ -0,0 +1,133 @@ +//! Provides network functionality for the Syncing thread. This fundamentally wraps a network +//! channel and stores a global RPC ID to perform requests. + +use super::message_processor::status_message; +use crate::service::NetworkMessage; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use eth2_libp2p::rpc::methods::*; +use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RequestId}; +use eth2_libp2p::PeerId; +use slog::{debug, trace, warn}; +use std::sync::Weak; +use tokio::sync::mpsc; + +/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. + +pub struct SyncNetworkContext { + /// The network channel to relay messages to the Network service. + network_send: mpsc::UnboundedSender, + + request_id: RequestId, + /// Logger for the `SyncNetworkContext`. + log: slog::Logger, +} + +impl SyncNetworkContext { + pub fn new(network_send: mpsc::UnboundedSender, log: slog::Logger) -> Self { + Self { + network_send, + request_id: 0, + log, + } + } + + pub fn status_peer( + &mut self, + chain: Weak>, + peer_id: PeerId, + ) { + trace!( + self.log, + "Sending Status Request"; + "method" => "STATUS", + "peer" => format!("{:?}", peer_id) + ); + if let Some(chain) = chain.upgrade() { + let _ = self.send_rpc_request(peer_id, RPCRequest::Status(status_message(&chain))); + } + } + + pub fn blocks_by_range_request( + &mut self, + peer_id: PeerId, + request: BlocksByRangeRequest, + ) -> Result { + trace!( + self.log, + "Sending BlocksByRange Request"; + "method" => "BlocksByRange", + "count" => request.count, + "peer" => format!("{:?}", peer_id) + ); + self.send_rpc_request(peer_id.clone(), RPCRequest::BlocksByRange(request)) + } + + pub fn blocks_by_root_request( + &mut self, + peer_id: PeerId, + request: BlocksByRootRequest, + ) -> Result { + trace!( + self.log, + "Sending BlocksByRoot Request"; + "method" => "BlocksByRoot", + "count" => request.block_roots.len(), + "peer" => format!("{:?}", peer_id) + ); + self.send_rpc_request(peer_id.clone(), RPCRequest::BlocksByRoot(request)) + } + + pub fn downvote_peer(&mut self, peer_id: PeerId) { + debug!( + self.log, + "Peer downvoted"; + "peer" => format!("{:?}", peer_id) + ); + // TODO: Implement reputation + self.disconnect(peer_id.clone(), GoodbyeReason::Fault); + } + + fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) { + warn!( + &self.log, + "Disconnecting peer (RPC)"; + "reason" => format!("{:?}", reason), + "peer_id" => format!("{:?}", peer_id), + ); + + // ignore the error if the channel send fails + let _ = self.send_rpc_request(peer_id.clone(), RPCRequest::Goodbye(reason)); + self.network_send + .try_send(NetworkMessage::Disconnect { peer_id }) + .unwrap_or_else(|_| { + warn!( + self.log, + "Could not send a Disconnect to the network service" + ) + }); + } + + pub fn send_rpc_request( + &mut self, + peer_id: PeerId, + rpc_request: RPCRequest, + ) -> Result { + let request_id = self.request_id; + self.request_id += 1; + self.send_rpc_event(peer_id, RPCEvent::Request(request_id, rpc_request))?; + Ok(request_id) + } + + fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) -> Result<(), &'static str> { + self.network_send + .try_send(NetworkMessage::RPC(peer_id, rpc_event)) + .map_err(|_| { + // This is likely to happen when shutting down. Suppress this warning to trace for now + trace!( + self.log, + "Could not send RPC message to the network service" + ); + "Network channel send Failed" + }) + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs new file mode 100644 index 000000000..333fcd723 --- /dev/null +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -0,0 +1,649 @@ +use crate::sync::message_processor::FUTURE_SLOT_TOLERANCE; +use crate::sync::network_context::SyncNetworkContext; +use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; +use eth2_libp2p::rpc::methods::*; +use eth2_libp2p::rpc::RequestId; +use eth2_libp2p::PeerId; +use fnv::FnvHashMap; +use slog::{crit, debug, error, trace, warn, Logger}; +use std::cmp::Ordering; +use std::collections::HashSet; +use std::ops::Sub; +use std::sync::Weak; +use types::{BeaconBlock, EthSpec, Hash256, Slot}; + +/// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch +/// is requested. There is a timeout for each batch request. If this value is too high, we will +/// downvote peers with poor bandwidth. This can be set arbitrarily high, in which case the +/// responder will fill the response up to the max request size, assuming they have the bandwidth +/// to do so. +//TODO: Make this dynamic based on peer's bandwidth +const BLOCKS_PER_BATCH: u64 = 50; + +/// The number of times to retry a batch before the chain is considered failed and removed. +const MAX_BATCH_RETRIES: u8 = 5; + +#[derive(PartialEq)] +pub struct Batch { + /// The ID of the batch, batches are ID's sequentially. + id: u64, + /// The requested start slot of the batch, inclusive. + start_slot: Slot, + /// The requested end slot of batch, exclusive. + end_slot: Slot, + /// The hash of the chain root to requested from the peer. + head_root: Hash256, + /// The peer that was originally assigned to the batch. + _original_peer: PeerId, + /// The peer that is currently assigned to the batch. + pub current_peer: PeerId, + /// The number of retries this batch has undergone. + retries: u8, + /// The blocks that have been downloaded. + downloaded_blocks: Vec>, +} + +impl Ord for Batch { + fn cmp(&self, other: &Self) -> Ordering { + self.id.cmp(&other.id) + } +} + +impl PartialOrd for Batch { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +pub enum ProcessingResult { + KeepChain, + RemoveChain, +} + +impl Eq for Batch {} + +impl Batch { + fn new(id: u64, start_slot: Slot, end_slot: Slot, head_root: Hash256, peer_id: PeerId) -> Self { + Batch { + id, + start_slot, + end_slot, + head_root, + _original_peer: peer_id.clone(), + current_peer: peer_id, + retries: 0, + downloaded_blocks: Vec::new(), + } + } + + fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { + BlocksByRangeRequest { + head_block_root: self.head_root, + start_slot: self.start_slot.into(), + count: std::cmp::min(BLOCKS_PER_BATCH, self.end_slot.sub(self.start_slot).into()), + step: 1, + } + } +} + +pub struct SyncingChain { + /// The original start slot when this chain was initialised. + pub start_slot: Slot, + + /// The target head slot. + pub target_head_slot: Slot, + + /// The target head root. + pub target_head_root: Hash256, + + /// The batches that are currently awaiting a response from a peer. An RPC request for these + /// have been sent. + pub pending_batches: FnvHashMap>, + + /// The batches that have been downloaded and are awaiting processing and/or validation. + completed_batches: Vec>, + + /// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain + /// and thus available to download this chain from. + pub peer_pool: HashSet, + + /// The next batch_id that needs to be downloaded. + to_be_downloaded_id: u64, + + /// The next batch id that needs to be processed. + to_be_processed_id: u64, + + /// The last batch id that was processed. + last_processed_id: u64, + + /// The current state of the chain. + pub state: ChainSyncingState, +} + +#[derive(PartialEq)] +pub enum ChainSyncingState { + /// The chain is not being synced. + Stopped, + /// The chain is undergoing syncing. + Syncing, + /// The chain is temporarily paused whilst an error is rectified. + Paused, +} + +impl SyncingChain { + pub fn new( + start_slot: Slot, + target_head_slot: Slot, + target_head_root: Hash256, + peer_id: PeerId, + ) -> Self { + let mut peer_pool = HashSet::new(); + peer_pool.insert(peer_id); + + SyncingChain { + start_slot, + target_head_slot, + target_head_root, + pending_batches: FnvHashMap::default(), + completed_batches: Vec::new(), + peer_pool, + to_be_downloaded_id: 1, + to_be_processed_id: 1, + last_processed_id: 0, + state: ChainSyncingState::Stopped, + } + } + + pub fn on_block_response( + &mut self, + chain: &Weak>, + network: &mut SyncNetworkContext, + request_id: RequestId, + beacon_block: &Option>, + log: &slog::Logger, + ) -> Option { + if let Some(block) = beacon_block { + let batch = self.pending_batches.get_mut(&request_id)?; + // This is not a stream termination, simply add the block to the request + batch.downloaded_blocks.push(block.clone()); + return Some(ProcessingResult::KeepChain); + } else { + // A stream termination has been sent. This batch has ended. Process a completed batch. + let batch = self.pending_batches.remove(&request_id)?; + Some(self.process_completed_batch(chain.clone(), network, batch, log)) + } + } + + fn process_completed_batch( + &mut self, + chain: Weak>, + network: &mut SyncNetworkContext, + batch: Batch, + log: &slog::Logger, + ) -> ProcessingResult { + // An entire batch of blocks has been received. This functions checks to see if it can be processed, + // remove any batches waiting to be verified and if this chain is syncing, request new + // blocks for the peer. + debug!(log, "Completed batch received"; "id"=>batch.id, "blocks"=>batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len()); + + // The peer that completed this batch, may be re-requested if this batch doesn't complete + // the chain and there is no error in processing + let current_peer = batch.current_peer.clone(); + + // verify the range of received blocks + // Note that the order of blocks is verified in block processing + if let Some(last_slot) = batch.downloaded_blocks.last().map(|b| b.slot) { + // the batch is non-empty + if batch.start_slot > batch.downloaded_blocks[0].slot || batch.end_slot < last_slot { + warn!(log, "BlocksByRange response returned out of range blocks"; + "response_initial_slot" => batch.downloaded_blocks[0].slot, + "requested_initial_slot" => batch.start_slot); + network.downvote_peer(batch.current_peer); + self.to_be_processed_id = batch.id; // reset the id back to here, when incrementing, it will check against completed batches + return ProcessingResult::KeepChain; + } + } + + // Add this completed batch to the list of completed batches. This list will then need to + // be checked if any batches can be processed and verified for errors or invalid responses + // from peers. The logic is simpler to create this ordered batch list and to then process + // the list. + + let insert_index = self + .completed_batches + .binary_search(&batch) + .unwrap_or_else(|index| index); + self.completed_batches.insert(insert_index, batch); + + // We have a list of completed batches. It is not sufficient to process batch successfully + // to consider the batch correct. This is because batches could be erroneously empty, or + // incomplete. Therefore, a batch is considered valid, only if the next sequential batch is + // processed successfully. Therefore the `completed_batches` will store batches that have + // already be processed but not verified and therefore have Id's less than + // `self.to_be_processed_id`. + + //TODO: Run the processing of blocks in a separate thread. Build a queue of completed + //blocks here, manage the queue and process them in another thread as they become + //available. + + if self.state != ChainSyncingState::Paused { + // pre-emptively request more blocks from peers whilst we process current blocks, + self.send_range_request(network, current_peer); + + // Try and process batches sequentially in the ordered list. + let current_process_id = self.to_be_processed_id; + for batch in self + .completed_batches + .iter() + .filter(|batch| batch.id >= current_process_id) + { + if batch.id == self.to_be_processed_id { + if batch.downloaded_blocks.is_empty() { + // the batch was empty, progress to the next block + self.to_be_processed_id += 1; + continue; + } else { + let mut successes = 0; + debug!(log, "Processing batch"; "batch_id" => batch.id); + match process_batch(chain.clone(), batch, &mut successes, log) { + Ok(_) => { + // batch was successfully processed + self.last_processed_id = self.to_be_processed_id; + self.to_be_processed_id += 1; + + if let Some(chain) = chain.upgrade() { + match chain.fork_choice() { + Ok(()) => trace!( + log, + "Fork choice success"; + "location" => "batch import success" + ), + Err(e) => error!( + log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "batch import success" + ), + } + } + } + Err(e) => { + warn!(log, "Block processing error"; "error"=> format!("{:?}", e)); + + if successes > 0 { + if let Some(chain) = chain.upgrade() { + match chain.fork_choice() { + Ok(()) => trace!( + log, + "Fork choice success"; + "block_imports" => successes, + "location" => "batch import error" + ), + Err(e) => error!( + log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "batch import error" + ), + } + } + } + + // batch processing failed + // this could be because this batch is invalid, or a previous invalidated batch + // is invalid. We need to find out which and downvote the peer that has sent us + // an invalid batch. + + // firstly remove any validated batches + return self.handle_invalid_batch(chain, network); + } + } + } + } else { + // there are no more batches to be processed, end + break; + } + } + // remove any validated batches + let last_processed_id = self.last_processed_id; + self.completed_batches + .retain(|batch| batch.id >= last_processed_id); + + // check if the chain has completed syncing, if not, request another batch from this peer + if self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH >= self.target_head_slot + { + // chain is completed + ProcessingResult::RemoveChain + } else { + // chain is not completed + ProcessingResult::KeepChain + } + } else { + ProcessingResult::KeepChain + } + } + + fn handle_invalid_batch( + &mut self, + _chain: Weak>, + network: &mut SyncNetworkContext, + ) -> ProcessingResult { + // The current batch could not be processed, indicating either the current or previous + // batches are invalid + + // The previous batch could be + // incomplete due to the block sizes being too large to fit in a single RPC + // request or there could be consecutive empty batches which are not supposed to be there + + // Address these two cases individually. + // Firstly, check if the past batch is invalid. + // + + //TODO: Implement this logic + // Currently just fail the chain, and drop all associated peers + for peer_id in self.peer_pool.iter() { + network.downvote_peer(peer_id.clone()); + } + ProcessingResult::RemoveChain + } + + pub fn stop_syncing(&mut self) { + self.state = ChainSyncingState::Stopped; + } + + // Either a new chain, or an old one with a peer list + pub fn start_syncing( + &mut self, + network: &mut SyncNetworkContext, + local_finalized_slot: Slot, + log: &slog::Logger, + ) { + // A local finalized slot is provided as other chains may have made + // progress whilst this chain was Stopped or paused. If so, update the `processed_batch_id` to + // accommodate potentially downloaded batches from other chains. Also prune any old batches + // awaiting processing + + // Only important if the local head is more than a batch worth of blocks ahead of + // what this chain believes is downloaded + let batches_ahead = local_finalized_slot + .as_u64() + .saturating_sub(self.start_slot.as_u64() + self.last_processed_id * BLOCKS_PER_BATCH) + / BLOCKS_PER_BATCH; + + if batches_ahead != 0 { + // there are `batches_ahead` whole batches that have been downloaded by another + // chain. Set the current processed_batch_id to this value. + debug!(log, "Updating chains processed batches"; "old_completed_slot" => self.start_slot + self.last_processed_id*BLOCKS_PER_BATCH, "new_completed_slot" => self.start_slot + (self.last_processed_id + batches_ahead)*BLOCKS_PER_BATCH); + self.last_processed_id += batches_ahead; + + if self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH + > self.target_head_slot.as_u64() + { + crit!( + log, + "Current head slot is above the target head"; + "target_head_slot" => self.target_head_slot.as_u64(), + "new_start" => self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH, + ); + return; + } + + // update the `to_be_downloaded_id` + if self.to_be_downloaded_id < self.last_processed_id { + self.to_be_downloaded_id = self.last_processed_id; + } + + let last_processed_id = self.last_processed_id; + self.completed_batches + .retain(|batch| batch.id >= last_processed_id.saturating_sub(1)); + } + + // Now begin requesting blocks from the peer pool. Ignore any peers with currently + // pending requests + let pending_peers = self + .pending_batches + .values() + .map(|batch| batch.current_peer.clone()) + .collect::>(); + + let peers = self + .peer_pool + .iter() + .filter(|peer| !pending_peers.contains(peer)) + .cloned() + .collect::>(); + + for peer_id in peers { + // send a blocks by range request to the peer + self.send_range_request(network, peer_id); + } + + self.state = ChainSyncingState::Syncing; + } + + // A peer has been added, start batch requests for this peer + // this should only be called for a syncing chain + pub fn peer_added( + &mut self, + network: &mut SyncNetworkContext, + peer_id: PeerId, + log: &slog::Logger, + ) { + // do not request blocks if the chain is not syncing + if let ChainSyncingState::Stopped = self.state { + debug!(log, "Peer added to a non-syncing chain"; "peer_id" => format!("{:?}", peer_id)); + return; + } + + // find the next batch and request it from the peer + self.send_range_request(network, peer_id); + } + + // Re-STATUS all the peers in this chain + pub fn status_peers(&self, chain: Weak>, network: &mut SyncNetworkContext) { + for peer_id in self.peer_pool.iter() { + network.status_peer(chain.clone(), peer_id.clone()); + } + } + + fn send_range_request(&mut self, network: &mut SyncNetworkContext, peer_id: PeerId) { + // find the next pending batch and request it from the peer + if let Some(batch) = self.get_next_batch(peer_id) { + // send the batch + self.send_batch(network, batch); + } + } + + fn send_batch(&mut self, network: &mut SyncNetworkContext, batch: Batch) { + let request = batch.to_blocks_by_range_request(); + if let Ok(request_id) = network.blocks_by_range_request(batch.current_peer.clone(), request) + { + // add the batch to pending list + self.pending_batches.insert(request_id, batch); + } + } + + fn get_next_batch(&mut self, peer_id: PeerId) -> Option> { + let batch_start_slot = + self.start_slot + self.to_be_downloaded_id.saturating_sub(1) * BLOCKS_PER_BATCH; + if batch_start_slot > self.target_head_slot { + return None; + } + let batch_end_slot = std::cmp::min( + batch_start_slot + BLOCKS_PER_BATCH, + self.target_head_slot.saturating_add(1u64), + ); + + let batch_id = self.to_be_downloaded_id; + // find the next batch id. The largest of the next sequential idea, of the next uncompleted + // id + let max_completed_id = + self.completed_batches + .iter() + .fold(0, |max, batch| if batch.id > max { batch.id } else { max }); + self.to_be_downloaded_id = + std::cmp::max(self.to_be_downloaded_id + 1, max_completed_id + 1); + + Some(Batch::new( + batch_id, + batch_start_slot, + batch_end_slot, + self.target_head_root, + peer_id, + )) + } + + // Checks if the request_id is associated with this chain. If so, attempts to re-request the + // batch. If the batch has exceeded the number of retries, returns Some(true), indicating + // the chain should be dropped. + pub fn inject_error( + &mut self, + network: &mut SyncNetworkContext, + peer_id: &PeerId, + request_id: &RequestId, + log: &slog::Logger, + ) -> Option { + if let Some(batch) = self.pending_batches.remove(&request_id) { + warn!(log, "Batch failed. RPC Error"; "id" => batch.id, "retries" => batch.retries, "peer" => format!("{:?}", peer_id)); + + Some(self.failed_batch(network, batch)) + } else { + None + } + } + + pub fn failed_batch( + &mut self, + network: &mut SyncNetworkContext, + mut batch: Batch, + ) -> ProcessingResult { + batch.retries += 1; + + if batch.retries > MAX_BATCH_RETRIES { + // chain is unrecoverable, remove it + ProcessingResult::RemoveChain + } else { + // try to re-process the request using a different peer, if possible + let current_peer = &batch.current_peer; + let new_peer = self + .peer_pool + .iter() + .find(|peer| *peer != current_peer) + .unwrap_or_else(|| current_peer); + + batch.current_peer = new_peer.clone(); + self.send_batch(network, batch); + ProcessingResult::KeepChain + } + } +} + +// Helper function to process block batches which only consumes the chain and blocks to process +fn process_batch( + chain: Weak>, + batch: &Batch, + successes: &mut usize, + log: &Logger, +) -> Result<(), String> { + for block in &batch.downloaded_blocks { + if let Some(chain) = chain.upgrade() { + let processing_result = chain.process_block(block.clone()); + + if let Ok(outcome) = processing_result { + match outcome { + BlockProcessingOutcome::Processed { block_root } => { + // The block was valid and we processed it successfully. + trace!( + log, "Imported block from network"; + "slot" => block.slot, + "block_root" => format!("{}", block_root), + ); + + *successes += 1 + } + BlockProcessingOutcome::ParentUnknown { parent } => { + // blocks should be sequential and all parents should exist + trace!( + log, "Parent block is unknown"; + "parent_root" => format!("{}", parent), + "baby_block_slot" => block.slot, + ); + return Err(format!( + "Block at slot {} has an unknown parent.", + block.slot + )); + } + BlockProcessingOutcome::BlockIsAlreadyKnown => { + // this block is already known to us, move to the next + debug!( + log, "Imported a block that is already known"; + "block_slot" => block.slot, + ); + } + BlockProcessingOutcome::FutureSlot { + present_slot, + block_slot, + } => { + if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { + // The block is too far in the future, drop it. + trace!( + log, "Block is ahead of our slot clock"; + "msg" => "block for future slot rejected, check your time", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + ); + return Err(format!( + "Block at slot {} is too far in the future", + block.slot + )); + } else { + // The block is in the future, but not too far. + trace!( + log, "Block is slightly ahead of our slot clock, ignoring."; + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + ); + } + } + BlockProcessingOutcome::WouldRevertFinalizedSlot { .. } => { + trace!( + log, "Finalized or earlier block processed"; + "outcome" => format!("{:?}", outcome), + ); + // block reached our finalized slot or was earlier, move to the next block + } + BlockProcessingOutcome::GenesisBlock => { + trace!( + log, "Genesis block was processed"; + "outcome" => format!("{:?}", outcome), + ); + } + _ => { + warn!( + log, "Invalid block received"; + "msg" => "peer sent invalid block", + "outcome" => format!("{:?}", outcome), + ); + return Err(format!("Invalid block at slot {}", block.slot)); + } + } + } else { + warn!( + log, "BlockProcessingFailure"; + "msg" => "unexpected condition in processing block.", + "outcome" => format!("{:?}", processing_result) + ); + return Err(format!( + "Unexpected block processing error: {:?}", + processing_result + )); + } + } else { + return Ok(()); // terminate early due to dropped beacon chain + } + } + + Ok(()) +} diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs new file mode 100644 index 000000000..9af96b8e4 --- /dev/null +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -0,0 +1,284 @@ +use super::chain::{ChainSyncingState, ProcessingResult, SyncingChain}; +use crate::sync::message_processor::PeerSyncInfo; +use crate::sync::network_context::SyncNetworkContext; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use eth2_libp2p::PeerId; +use slog::{debug, warn}; +use std::sync::Weak; +use types::EthSpec; +use types::{Hash256, Slot}; + +pub enum SyncState { + Finalized, + Head, + Idle, +} +pub struct ChainCollection { + finalized_chains: Vec>, + head_chains: Vec>, + sync_state: SyncState, +} + +impl ChainCollection { + pub fn new() -> Self { + ChainCollection { + sync_state: SyncState::Idle, + finalized_chains: Vec::new(), + head_chains: Vec::new(), + } + } + + pub fn sync_state(&self) -> &SyncState { + &self.sync_state + } + + // if a finalized chain just completed, we assume we waiting for head syncing, unless a fully + // sync peer joins. + pub fn fully_synced_peer_found(&mut self) { + if let SyncState::Head = self.sync_state { + if self.head_chains.is_empty() { + self.sync_state = SyncState::Idle; + } + } + } + + // after a finalized chain completes, the state should be waiting for a head chain + pub fn set_head_sync(&mut self) { + if let SyncState::Idle = self.sync_state { + self.sync_state = SyncState::Head; + } + } + + fn finalized_syncing_index(&self) -> Option { + self.finalized_chains + .iter() + .enumerate() + .find_map(|(index, chain)| { + if chain.state == ChainSyncingState::Syncing { + Some(index) + } else { + None + } + }) + } + + pub fn purge_finalized(&mut self, local_finalized_slot: Slot) { + self.finalized_chains + .retain(|chain| chain.target_head_slot > local_finalized_slot); + } + + pub fn purge_head(&mut self, head_slot: Slot) { + self.head_chains + .retain(|chain| chain.target_head_slot > head_slot); + } + + fn get_chain<'a>( + chain: &'a mut [SyncingChain], + target_head_root: Hash256, + target_head_slot: Slot, + ) -> Option<&'a mut SyncingChain> { + chain.iter_mut().find(|iter_chain| { + iter_chain.target_head_root == target_head_root + && iter_chain.target_head_slot == target_head_slot + }) + } + + /// Finds any finalized chain if it exists. + pub fn get_finalized_mut( + &mut self, + target_head_root: Hash256, + target_head_slot: Slot, + ) -> Option<&mut SyncingChain> { + ChainCollection::get_chain( + self.finalized_chains.as_mut(), + target_head_root, + target_head_slot, + ) + } + + /// Finds any finalized chain if it exists. + pub fn get_head_mut( + &mut self, + target_head_root: Hash256, + target_head_slot: Slot, + ) -> Option<&mut SyncingChain> { + ChainCollection::get_chain( + self.head_chains.as_mut(), + target_head_root, + target_head_slot, + ) + } + + /// Checks if a new finalized state should become the syncing chain. Updates the state of the + /// collection. + pub fn update_finalized( + &mut self, + beacon_chain: Weak>, + network: &mut SyncNetworkContext, + log: &slog::Logger, + ) { + let local_info = match beacon_chain.upgrade() { + Some(chain) => PeerSyncInfo::from(&chain), + None => { + warn!(log, "Beacon chain dropped. Chains not updated"); + return; + } + }; + + let local_slot = local_info + .finalized_epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + // Remove any outdated finalized chains + self.purge_finalized(local_slot); + self.finalized_chains + .retain(|chain| !chain.peer_pool.is_empty()); + + // Remove any outdated head chains + self.purge_head(local_info.head_slot); + self.finalized_chains + .retain(|chain| !chain.peer_pool.is_empty()); + + // Check if any chains become the new syncing chain + if let Some(index) = self.finalized_syncing_index() { + // There is a current finalized chain syncing + let syncing_chain_peer_count = self.finalized_chains[index].peer_pool.len(); + + // search for a chain with more peers + if let Some((new_index, chain)) = + self.finalized_chains + .iter_mut() + .enumerate() + .find(|(iter_index, chain)| { + *iter_index != index && chain.peer_pool.len() > syncing_chain_peer_count + }) + { + // A chain has more peers. Swap the syncing chain + debug!(log, "Switching finalized chains to sync"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot); + + // Stop the current chain from syncing + self.finalized_chains[index].stop_syncing(); + // Start the new chain + self.finalized_chains[new_index].start_syncing(network, local_slot, log); + self.sync_state = SyncState::Finalized; + } + } else if let Some(chain) = self + .finalized_chains + .iter_mut() + .max_by_key(|chain| chain.peer_pool.len()) + { + // There is no currently syncing finalization chain, starting the one with the most peers + debug!(log, "New finalized chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot); + chain.start_syncing(network, local_slot, log); + self.sync_state = SyncState::Finalized; + } else { + // There are no finalized chains, update the state + if self.head_chains.is_empty() { + self.sync_state = SyncState::Idle; + } else { + self.sync_state = SyncState::Head; + } + } + } + + /// Add a new finalized chain to the collection + pub fn new_finalized_chain( + &mut self, + local_finalized_slot: Slot, + target_head: Hash256, + target_slot: Slot, + peer_id: PeerId, + ) { + self.finalized_chains.push(SyncingChain::new( + local_finalized_slot, + target_slot, + target_head, + peer_id, + )); + } + + /// Add a new finalized chain to the collection + pub fn new_head_chain( + &mut self, + network: &mut SyncNetworkContext, + remote_finalized_slot: Slot, + target_head: Hash256, + target_slot: Slot, + peer_id: PeerId, + log: &slog::Logger, + ) { + // remove the peer from any other head chains + + self.head_chains.iter_mut().for_each(|chain| { + chain.peer_pool.remove(&peer_id); + }); + self.head_chains.retain(|chain| !chain.peer_pool.is_empty()); + + let mut new_head_chain = + SyncingChain::new(remote_finalized_slot, target_slot, target_head, peer_id); + // All head chains can sync simultaneously + new_head_chain.start_syncing(network, remote_finalized_slot, log); + self.head_chains.push(new_head_chain); + } + + pub fn is_finalizing_sync(&self) -> bool { + !self.finalized_chains.is_empty() + } + + fn request_function<'a, F, I>(chain: I, mut func: F) -> Option<(usize, ProcessingResult)> + where + I: Iterator>, + F: FnMut(&'a mut SyncingChain) -> Option, + { + chain + .enumerate() + .find_map(|(index, chain)| Some((index, func(chain)?))) + } + + pub fn finalized_request(&mut self, func: F) -> Option<(usize, ProcessingResult)> + where + F: FnMut(&mut SyncingChain) -> Option, + { + ChainCollection::request_function(self.finalized_chains.iter_mut(), func) + } + + pub fn head_request(&mut self, func: F) -> Option<(usize, ProcessingResult)> + where + F: FnMut(&mut SyncingChain) -> Option, + { + ChainCollection::request_function(self.head_chains.iter_mut(), func) + } + + #[allow(dead_code)] + pub fn head_finalized_request(&mut self, func: F) -> Option<(usize, ProcessingResult)> + where + F: FnMut(&mut SyncingChain) -> Option, + { + ChainCollection::request_function( + self.finalized_chains + .iter_mut() + .chain(self.head_chains.iter_mut()), + func, + ) + } + + pub fn remove_finalized_chain(&mut self, index: usize) -> SyncingChain { + self.finalized_chains.swap_remove(index) + } + + pub fn remove_head_chain(&mut self, index: usize) -> SyncingChain { + self.head_chains.swap_remove(index) + } + + /// Removes a chain from either finalized or head chains based on the index. Using a request + /// iterates of finalized chains before head chains. Thus an index that is greater than the + /// finalized chain length, indicates a head chain. + pub fn remove_chain(&mut self, index: usize) -> SyncingChain { + if index >= self.finalized_chains.len() { + let index = index - self.finalized_chains.len(); + self.head_chains.swap_remove(index) + } else { + self.finalized_chains.swap_remove(index) + } + } +} diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs new file mode 100644 index 000000000..534f4d345 --- /dev/null +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -0,0 +1,8 @@ +//! This provides the logic for syncing a chain when the local node is far behind it's current +//! peers. + +mod chain; +mod chain_collection; +mod range; + +pub use range::RangeSync; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs new file mode 100644 index 000000000..d2db90932 --- /dev/null +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -0,0 +1,314 @@ +use super::chain::ProcessingResult; +use super::chain_collection::{ChainCollection, SyncState}; +use crate::sync::message_processor::PeerSyncInfo; +use crate::sync::network_context::SyncNetworkContext; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use eth2_libp2p::rpc::RequestId; +use eth2_libp2p::PeerId; +use slog::{debug, trace, warn}; +use std::collections::HashSet; +use std::sync::Weak; +use types::{BeaconBlock, EthSpec}; + +//TODO: The code becomes cleaner if finalized_chains and head_chains were merged into a single +// object. This will prevent code duplication. Rather than keeping the current syncing +// finalized chain in index 0, it should be stored in this object under an option. Then lookups can +// occur over the single object containing both finalized and head chains, which would then +// behave similarly. + +pub struct RangeSync { + /// The beacon chain for processing + beacon_chain: Weak>, + chains: ChainCollection, + /// Known peers to the RangeSync, that need to be re-status'd once finalized chains are + /// completed. + awaiting_head_peers: HashSet, + log: slog::Logger, +} + +impl RangeSync { + pub fn new(beacon_chain: Weak>, log: slog::Logger) -> Self { + RangeSync { + beacon_chain, + chains: ChainCollection::new(), + awaiting_head_peers: HashSet::new(), + log, + } + } + + // Notify the collection that a fully synced peer was found. This allows updating the state + // if we were awaiting a head state. + pub fn fully_synced_peer_found(&mut self) { + self.chains.fully_synced_peer_found() + } + + pub fn add_peer( + &mut self, + network: &mut SyncNetworkContext, + peer_id: PeerId, + remote: PeerSyncInfo, + ) { + // evaluate which chain to sync from + + // determine if we need to run a sync to the nearest finalized state or simply sync to + // its current head + let local_info = match self.beacon_chain.upgrade() { + Some(chain) => PeerSyncInfo::from(&chain), + None => { + warn!(self.log, + "Beacon chain dropped. Peer not considered for sync"; + "peer_id" => format!("{:?}", peer_id)); + return; + } + }; + + // convenience variables + let remote_finalized_slot = remote + .finalized_epoch + .start_slot(T::EthSpec::slots_per_epoch()); + let local_finalized_slot = local_info + .finalized_epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + // firstly, remove any out-of-date chains + self.chains.purge_finalized(local_finalized_slot); + self.chains.purge_head(local_info.head_slot); + + // remove peer from any chains + self.remove_peer(network, &peer_id); + + if remote_finalized_slot > local_info.head_slot { + debug!(self.log, "Finalization sync peer joined"; "peer_id" => format!("{:?}", peer_id)); + // Finalized chain search + + // Note: We keep current head chains. These can continue syncing whilst we complete + // this new finalized chain. + + // If a finalized chain already exists that matches, add this peer to the chain's peer + // pool. + if let Some(chain) = self + .chains + .get_finalized_mut(remote.finalized_root, remote_finalized_slot) + { + debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => format!("{:?}", peer_id), "target_root" => format!("{}", chain.target_head_root), "end_slot" => chain.target_head_slot, "start_slot"=> chain.start_slot); + + // add the peer to the chain's peer pool + chain.peer_pool.insert(peer_id.clone()); + chain.peer_added(network, peer_id, &self.log); + + // check if the new peer's addition will favour a new syncing chain. + self.chains + .update_finalized(self.beacon_chain.clone(), network, &self.log); + } else { + // there is no finalized chain that matches this peer's last finalized target + // create a new finalized chain + debug!(self.log, "New finalized chain added to sync"; "peer_id" => format!("{:?}", peer_id), "start_slot" => local_finalized_slot.as_u64(), "end_slot" => remote_finalized_slot.as_u64(), "finalized_root" => format!("{}", remote.finalized_root)); + + self.chains.new_finalized_chain( + local_finalized_slot, + remote.finalized_root, + remote_finalized_slot, + peer_id, + ); + self.chains + .update_finalized(self.beacon_chain.clone(), network, &self.log); + } + } else { + if self.chains.is_finalizing_sync() { + // If there are finalized chains to sync, finish these first, before syncing head + // chains. This allows us to re-sync all known peers + trace!(self.log, "Waiting for finalized sync to complete"; "peer_id" => format!("{:?}", peer_id)); + return; + } + + // The new peer has the same finalized (earlier filters should prevent a peer with an + // earlier finalized chain from reaching here). + debug!(self.log, "New peer added for recent head sync"; "peer_id" => format!("{:?}", peer_id)); + + // search if there is a matching head chain, then add the peer to the chain + if let Some(chain) = self.chains.get_head_mut(remote.head_root, remote.head_slot) { + debug!(self.log, "Adding peer to the existing head chain peer pool"; "head_root" => format!("{}",remote.head_root), "head_slot" => remote.head_slot, "peer_id" => format!("{:?}", peer_id)); + + // add the peer to the head's pool + chain.peer_pool.insert(peer_id.clone()); + chain.peer_added(network, peer_id.clone(), &self.log); + } else { + // There are no other head chains that match this peer's status, create a new one, and + let start_slot = std::cmp::min(local_info.head_slot, remote_finalized_slot); + debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote.head_root), "start_slot" => start_slot, "head_slot" => remote.head_slot, "peer_id" => format!("{:?}", peer_id)); + self.chains.new_head_chain( + network, + start_slot, + remote.head_root, + remote.head_slot, + peer_id, + &self.log, + ); + } + self.chains + .update_finalized(self.beacon_chain.clone(), network, &self.log); + } + } + + pub fn blocks_by_range_response( + &mut self, + network: &mut SyncNetworkContext, + peer_id: PeerId, + request_id: RequestId, + beacon_block: Option>, + ) { + // Find the request. Most likely the first finalized chain (the syncing chain). If there + // are no finalized chains, then it will be a head chain. At most, there should only be + // `connected_peers` number of head chains, which should be relatively small and this + // lookup should not be very expensive. However, we could add an extra index that maps the + // request id to index of the vector to avoid O(N) searches and O(N) hash lookups. + // Note to future sync-rewriter/profiler: Michael approves of these O(N) searches. + + let chain_ref = &self.beacon_chain; + let log_ref = &self.log; + match self.chains.finalized_request(|chain| { + chain.on_block_response(chain_ref, network, request_id, &beacon_block, log_ref) + }) { + Some((_, ProcessingResult::KeepChain)) => {} // blocks added to the chain + Some((index, ProcessingResult::RemoveChain)) => { + let chain = self.chains.remove_finalized_chain(index); + debug!(self.log, "Finalized chain removed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); + // the chain is complete, re-status it's peers + chain.status_peers(self.beacon_chain.clone(), network); + + // update the state of the collection + self.chains + .update_finalized(self.beacon_chain.clone(), network, &self.log); + + // set the state to a head sync, to inform the manager that we are awaiting a + // head chain. + self.chains.set_head_sync(); + + // if there are no more finalized chains, re-status all known peers awaiting a head + // sync + match self.chains.sync_state() { + SyncState::Idle | SyncState::Head => { + for peer_id in self.awaiting_head_peers.iter() { + network.status_peer(self.beacon_chain.clone(), peer_id.clone()); + } + } + SyncState::Finalized => {} // Have more finalized chains to complete + } + } + None => { + // The request was not in any finalized chain, search head chains + match self.chains.head_request(|chain| { + chain.on_block_response(&chain_ref, network, request_id, &beacon_block, log_ref) + }) { + Some((index, ProcessingResult::RemoveChain)) => { + let chain = self.chains.remove_head_chain(index); + debug!(self.log, "Head chain completed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); + // the chain is complete, re-status it's peers and remove it + chain.status_peers(self.beacon_chain.clone(), network); + + // update the state of the collection + self.chains + .update_finalized(self.beacon_chain.clone(), network, &self.log); + } + Some(_) => {} + None => { + // The request didn't exist in any `SyncingChain`. Could have been an old request. Log + // and ignore + debug!(self.log, "Range response without matching request"; "peer" => format!("{:?}", peer_id), "request_id" => request_id); + } + } + } + } + } + + pub fn is_syncing(&self) -> bool { + match self.chains.sync_state() { + SyncState::Finalized => true, + SyncState::Head => true, + SyncState::Idle => false, + } + } + + pub fn peer_disconnect(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { + // if the peer is in the awaiting head mapping, remove it + self.awaiting_head_peers.remove(&peer_id); + + // remove the peer from any peer pool + self.remove_peer(network, peer_id); + + // update the state of the collection + self.chains + .update_finalized(self.beacon_chain.clone(), network, &self.log); + } + + /// When a peer gets removed, both the head and finalized chains need to be searched to check which pool the peer is in. The chain may also have a batch or batches awaiting + /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum + /// retries. In this case, we need to remove the chain and re-status all the peers. + fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { + match self.chains.head_finalized_request(|chain| { + if chain.peer_pool.remove(&peer_id) { + // this chain contained the peer + let pending_batches_requests = chain + .pending_batches + .iter() + .filter(|(_, batch)| batch.current_peer == *peer_id) + .map(|(id, _)| id) + .cloned() + .collect::>(); + for request_id in pending_batches_requests { + if let Some(batch) = chain.pending_batches.remove(&request_id) { + if let ProcessingResult::RemoveChain = chain.failed_batch(network, batch) { + // a single batch failed, remove the chain + return Some(ProcessingResult::RemoveChain); + } + } + } + // peer removed from chain, no batch failed + Some(ProcessingResult::KeepChain) + } else { + None + } + }) { + Some((index, ProcessingResult::RemoveChain)) => { + // the chain needed to be removed + let chain = self.chains.remove_chain(index); + debug!(self.log, "Chain was removed due batch failing"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); + // the chain has been removed, re-status it's peers + chain.status_peers(self.beacon_chain.clone(), network); + // update the state of the collection + self.chains + .update_finalized(self.beacon_chain.clone(), network, &self.log); + } + _ => {} // chain didn't need to be removed, ignore + } + + // remove any chains that no longer have any peers + } + + // An RPC Error occurred, if it's a pending batch, re-request it if possible, if there have + // been too many attempts, remove the chain + pub fn inject_error( + &mut self, + network: &mut SyncNetworkContext, + peer_id: PeerId, + request_id: RequestId, + ) { + // check that this request is pending + let log_ref = &self.log; + match self.chains.head_finalized_request(|chain| { + chain.inject_error(network, &peer_id, &request_id, log_ref) + }) { + Some((_, ProcessingResult::KeepChain)) => {} // error handled chain persists + Some((index, ProcessingResult::RemoveChain)) => { + let chain = self.chains.remove_chain(index); + debug!(self.log, "Chain was removed due to error"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); + // the chain has failed, re-status it's peers + chain.status_peers(self.beacon_chain.clone(), network); + // update the state of the collection + self.chains + .update_finalized(self.beacon_chain.clone(), network, &self.log); + } + None => {} // request wasn't in the finalized chains, check the head chains + } + } +} diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index b3da45936..5b3c703cc 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -251,7 +251,39 @@ pub fn publish_beacon_block( "block_slot" => slot, ); - publish_beacon_block_to_network::(network_chan, block) + publish_beacon_block_to_network::(network_chan, block)?; + + // Run the fork choice algorithm and enshrine a new canonical head, if + // found. + // + // The new head may or may not be the block we just received. + if let Err(e) = beacon_chain.fork_choice() { + error!( + log, + "Failed to find beacon chain head"; + "error" => format!("{:?}", e) + ); + } else { + // In the best case, validators should produce blocks that become the + // head. + // + // Potential reasons this may not be the case: + // + // - A quick re-org between block produce and publish. + // - Excessive time between block produce and publish. + // - A validator is using another beacon node to produce blocks and + // submitting them here. + if beacon_chain.head().beacon_block_root != block_root { + warn!( + log, + "Block from validator is not head"; + "desc" => "potential re-org", + ); + + } + } + + Ok(()) } Ok(outcome) => { warn!( @@ -278,8 +310,8 @@ pub fn publish_beacon_block( ))) } } - }) - .and_then(|_| response_builder?.body_no_ssz(&())), + }) + .and_then(|_| response_builder?.body_no_ssz(&())) ) } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 175d64371..95e26650f 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -22,7 +22,7 @@ type Config = (ClientConfig, Eth2Config, Logger); /// Gets the fully-initialized global client and eth2 configuration objects. /// -/// The top-level `clap` arguments should be provied as `cli_args`. +/// The top-level `clap` arguments should be provided as `cli_args`. /// /// The output of this function depends primarily upon the given `cli_args`, however it's behaviour /// may be influenced by other external services like the contents of the file system or the diff --git a/eth2/utils/eth2_testnet_config/src/lib.rs b/eth2/utils/eth2_testnet_config/src/lib.rs index 4b0d30408..8ef0af5de 100644 --- a/eth2/utils/eth2_testnet_config/src/lib.rs +++ b/eth2/utils/eth2_testnet_config/src/lib.rs @@ -3,7 +3,7 @@ //! https://github.com/eth2-clients/eth2-testnets/tree/master/nimbus/testnet1 //! //! It is not accurate at the moment, we include extra files and we also don't support a few -//! others. We are unable to confirm to the repo until we have the following PR merged: +//! others. We are unable to conform to the repo until we have the following PR merged: //! //! https://github.com/sigp/lighthouse/pull/605