diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4a2708830..d8b3b547c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -10,7 +10,6 @@ use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; use crate::timeout_rw_lock::TimeoutRwLock; use lmd_ghost::LmdGhost; use operation_pool::{OperationPool, PersistedOperationPool}; -use parking_lot::RwLock; use slog::{debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use ssz::Encode; diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 4f2908c93..9d9020386 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -314,6 +314,7 @@ where substream: out, request, }; + debug!(self.log, "Added outbound substream id"; "substream_id" => id); self.outbound_substreams .insert(id, (awaiting_stream, delay_key)); } @@ -418,6 +419,8 @@ where }; if self.pending_error.is_none() { self.pending_error = Some((request_id, error)); + } else { + crit!(self.log, "Couldn't add error"); } } @@ -448,6 +451,7 @@ where } ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => { // negotiation timeout, mark the request as failed + debug!(self.log, "Active substreams before timeout"; "len" => self.outbound_substreams.len()); return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( RPCEvent::Error( request_id, @@ -707,21 +711,18 @@ where } // establish outbound substreams - if !self.dial_queue.is_empty() { - if self.dial_negotiated < self.max_dial_negotiated { - self.dial_negotiated += 1; - let rpc_event = self.dial_queue.remove(0); - if let RPCEvent::Request(id, req) = rpc_event { - return Ok(Async::Ready( - ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(req.clone()), - info: RPCEvent::Request(id, req), - }, - )); - } - } - } else { + if !self.dial_queue.is_empty() && self.dial_negotiated < self.max_dial_negotiated { + self.dial_negotiated += 1; + let rpc_event = self.dial_queue.remove(0); self.dial_queue.shrink_to_fit(); + if let RPCEvent::Request(id, req) = rpc_event { + return Ok(Async::Ready( + ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(req.clone()), + info: RPCEvent::Request(id, req), + }, + )); + } } Ok(Async::NotReady) } diff --git a/beacon_node/network/src/message_processor.rs b/beacon_node/network/src/message_processor.rs index dc4f91d2f..f12d4d0de 100644 --- a/beacon_node/network/src/message_processor.rs +++ b/beacon_node/network/src/message_processor.rs @@ -562,9 +562,9 @@ impl MessageProcessor { self.log, "Processed attestation"; "source" => "gossip", - "outcome" => format!("{:?}", outcome), "peer" => format!("{:?}",peer_id), - "data" => format!("{:?}", msg.data) + "block_root" => format!("{}", msg.data.beacon_block_root), + "slot" => format!("{}", msg.data.slot), ); } AttestationProcessingOutcome::UnknownHeadBlock { beacon_block_root } => { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 3883f2739..8b7e210c9 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -34,7 +34,7 @@ //! subsequently search for parents if needed. use super::network_context::SyncNetworkContext; -use super::range_sync::RangeSync; +use super::range_sync::{Batch, BatchProcessResult, RangeSync}; use crate::message_processor::PeerSyncInfo; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; @@ -45,6 +45,7 @@ use fnv::FnvHashMap; use futures::prelude::*; use slog::{crit, debug, error, info, trace, warn, Logger}; use smallvec::SmallVec; +use std::boxed::Box; use std::collections::HashSet; use std::ops::Sub; use std::sync::Weak; @@ -94,6 +95,13 @@ pub enum SyncMessage { /// An RPC Error has occurred on a request. RPCError(PeerId, RequestId), + + /// A batch has been processed by the block processor thread. + BatchProcessed { + process_id: u64, + batch: Box>, + result: BatchProcessResult, + }, } /// Maintains a sequential list of parents to lookup and the lookup's current state. @@ -185,7 +193,7 @@ pub fn spawn( state: ManagerState::Stalled, input_channel: sync_recv, network: SyncNetworkContext::new(network_send, log.clone()), - range_sync: RangeSync::new(beacon_chain, log.clone()), + range_sync: RangeSync::new(beacon_chain, sync_send.clone(), log.clone()), parent_queue: SmallVec::new(), single_block_lookups: FnvHashMap::default(), full_peers: HashSet::new(), @@ -679,6 +687,18 @@ impl Future for SyncManager { SyncMessage::RPCError(peer_id, request_id) => { self.inject_error(peer_id, request_id); } + SyncMessage::BatchProcessed { + process_id, + batch, + result, + } => { + self.range_sync.handle_block_process_result( + &mut self.network, + process_id, + *batch, + result, + ); + } }, Ok(Async::NotReady) => break, Ok(Async::Ready(None)) => { diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 45a7672e4..e487e795b 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,16 +1,40 @@ +use super::chain::BLOCKS_PER_BATCH; +use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::RequestId; use eth2_libp2p::PeerId; use fnv::FnvHashMap; use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; +use std::ops::Sub; use types::{BeaconBlock, EthSpec, Hash256, Slot}; +#[derive(Copy, Clone, Debug, PartialEq)] +pub struct BatchId(pub u64); + +impl std::ops::Deref for BatchId { + type Target = u64; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl std::ops::DerefMut for BatchId { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl std::convert::From for BatchId { + fn from(id: u64) -> Self { + BatchId(id) + } +} + /// A collection of sequential blocks that are requested from peers in a single RPC request. -#[derive(PartialEq)] +#[derive(PartialEq, Debug)] pub struct Batch { /// The ID of the batch, these are sequential. - pub id: u64, + pub id: BatchId, /// The requested start slot of the batch, inclusive. pub start_slot: Slot, /// The requested end slot of batch, exclusive. @@ -27,9 +51,41 @@ pub struct Batch { pub downloaded_blocks: Vec>, } +impl Eq for Batch {} + +impl Batch { + pub fn new( + id: BatchId, + start_slot: Slot, + end_slot: Slot, + head_root: Hash256, + peer_id: PeerId, + ) -> Self { + Batch { + id, + start_slot, + end_slot, + head_root, + _original_peer: peer_id.clone(), + current_peer: peer_id, + retries: 0, + downloaded_blocks: Vec::new(), + } + } + + pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { + BlocksByRangeRequest { + head_block_root: self.head_root, + start_slot: self.start_slot.into(), + count: std::cmp::min(BLOCKS_PER_BATCH, self.end_slot.sub(self.start_slot).into()), + step: 1, + } + } +} + impl Ord for Batch { fn cmp(&self, other: &Self) -> Ordering { - self.id.cmp(&other.id) + self.id.0.cmp(&other.id.0) } } @@ -83,6 +139,11 @@ impl PendingBatches { } } + /// The number of current pending batch requests. + pub fn len(&self) -> usize { + self.batches.len() + } + /// Adds a block to the batches if the request id exists. Returns None if there is no batch /// matching the request id. pub fn add_block(&mut self, request_id: RequestId, block: BeaconBlock) -> Option<()> { diff --git a/beacon_node/network/src/sync/range_sync/batch_processing.rs b/beacon_node/network/src/sync/range_sync/batch_processing.rs new file mode 100644 index 000000000..27c4fb295 --- /dev/null +++ b/beacon_node/network/src/sync/range_sync/batch_processing.rs @@ -0,0 +1,193 @@ +use super::batch::Batch; +use crate::message_processor::FUTURE_SLOT_TOLERANCE; +use crate::sync::manager::SyncMessage; +use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; +use slog::{debug, error, trace, warn}; +use std::sync::{Arc, Weak}; +use tokio::sync::mpsc; + +/// The result of attempting to process a batch of blocks. +// TODO: When correct batch error handling occurs, we will include an error type. +#[derive(Debug)] +pub enum BatchProcessResult { + /// The batch was completed successfully. + Success, + /// The batch processing failed. + Failed, +} + +// TODO: Refactor to async fn, with stable futures +pub fn spawn_batch_processor( + chain: Weak>, + process_id: u64, + batch: Batch, + mut sync_send: mpsc::UnboundedSender>, + log: slog::Logger, +) { + std::thread::spawn(move || { + debug!(log, "Processing batch"; "id" => *batch.id); + let result = match process_batch(chain, &batch, &log) { + Ok(_) => BatchProcessResult::Success, + Err(_) => BatchProcessResult::Failed, + }; + + debug!(log, "Batch processed"; "id" => *batch.id, "result" => format!("{:?}", result)); + + sync_send + .try_send(SyncMessage::BatchProcessed { + process_id, + batch: Box::new(batch), + result, + }) + .unwrap_or_else(|_| { + debug!( + log, + "Batch result could not inform sync. Likely shutting down." + ); + }); + }); +} + +// Helper function to process block batches which only consumes the chain and blocks to process +fn process_batch( + chain: Weak>, + batch: &Batch, + log: &slog::Logger, +) -> Result<(), String> { + let mut successful_block_import = false; + for block in &batch.downloaded_blocks { + if let Some(chain) = chain.upgrade() { + let processing_result = chain.process_block(block.clone()); + + if let Ok(outcome) = processing_result { + match outcome { + BlockProcessingOutcome::Processed { block_root } => { + // The block was valid and we processed it successfully. + trace!( + log, "Imported block from network"; + "slot" => block.slot, + "block_root" => format!("{}", block_root), + ); + successful_block_import = true; + } + BlockProcessingOutcome::ParentUnknown { parent } => { + // blocks should be sequential and all parents should exist + warn!( + log, "Parent block is unknown"; + "parent_root" => format!("{}", parent), + "baby_block_slot" => block.slot, + ); + if successful_block_import { + run_fork_choice(chain, log); + } + return Err(format!( + "Block at slot {} has an unknown parent.", + block.slot + )); + } + BlockProcessingOutcome::BlockIsAlreadyKnown => { + // this block is already known to us, move to the next + debug!( + log, "Imported a block that is already known"; + "block_slot" => block.slot, + ); + } + BlockProcessingOutcome::FutureSlot { + present_slot, + block_slot, + } => { + if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { + // The block is too far in the future, drop it. + warn!( + log, "Block is ahead of our slot clock"; + "msg" => "block for future slot rejected, check your time", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + ); + if successful_block_import { + run_fork_choice(chain, log); + } + return Err(format!( + "Block at slot {} is too far in the future", + block.slot + )); + } else { + // The block is in the future, but not too far. + debug!( + log, "Block is slightly ahead of our slot clock, ignoring."; + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + ); + } + } + BlockProcessingOutcome::WouldRevertFinalizedSlot { .. } => { + debug!( + log, "Finalized or earlier block processed"; + "outcome" => format!("{:?}", outcome), + ); + // block reached our finalized slot or was earlier, move to the next block + } + BlockProcessingOutcome::GenesisBlock => { + debug!( + log, "Genesis block was processed"; + "outcome" => format!("{:?}", outcome), + ); + } + _ => { + warn!( + log, "Invalid block received"; + "msg" => "peer sent invalid block", + "outcome" => format!("{:?}", outcome), + ); + if successful_block_import { + run_fork_choice(chain, log); + } + return Err(format!("Invalid block at slot {}", block.slot)); + } + } + } else { + warn!( + log, "BlockProcessingFailure"; + "msg" => "unexpected condition in processing block.", + "outcome" => format!("{:?}", processing_result) + ); + if successful_block_import { + run_fork_choice(chain, log); + } + return Err(format!( + "Unexpected block processing error: {:?}", + processing_result + )); + } + } else { + return Ok(()); // terminate early due to dropped beacon chain + } + } + + // Batch completed successfully, run fork choice. + if let Some(chain) = chain.upgrade() { + run_fork_choice(chain, log); + } + + Ok(()) +} + +/// Runs fork-choice on a given chain. This is used during block processing after one successful +/// block import. +fn run_fork_choice(chain: Arc>, log: &slog::Logger) { + match chain.fork_choice() { + Ok(()) => trace!( + log, + "Fork choice success"; + "location" => "batch processing" + ), + Err(e) => error!( + log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "batch import error" + ), + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index ddeaf0583..165d27ab2 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,28 +1,35 @@ -use crate::message_processor::FUTURE_SLOT_TOLERANCE; +use super::batch::{Batch, BatchId, PendingBatches}; +use super::batch_processing::{spawn_batch_processor, BatchProcessResult}; use crate::sync::network_context::SyncNetworkContext; -use crate::sync::range_sync::batch::{Batch, PendingBatches}; -use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; -use eth2_libp2p::rpc::methods::*; +use crate::sync::SyncMessage; +use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::RequestId; use eth2_libp2p::PeerId; -use slog::{crit, debug, error, trace, warn, Logger}; +use rand::prelude::*; +use slog::{crit, debug, warn}; use std::collections::HashSet; -use std::ops::Sub; use std::sync::Weak; -use types::{BeaconBlock, EthSpec, Hash256, Slot}; +use tokio::sync::mpsc; +use types::{BeaconBlock, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch /// is requested. There is a timeout for each batch request. If this value is too high, we will /// downvote peers with poor bandwidth. This can be set arbitrarily high, in which case the /// responder will fill the response up to the max request size, assuming they have the bandwidth /// to do so. -//TODO: Make this dynamic based on peer's bandwidth -//TODO: This is lower due to current thread design. Modify once rebuilt. -const BLOCKS_PER_BATCH: u64 = 25; +pub const BLOCKS_PER_BATCH: u64 = 50; /// The number of times to retry a batch before the chain is considered failed and removed. const MAX_BATCH_RETRIES: u8 = 5; +/// The maximum number of batches to queue before requesting more. +const BATCH_BUFFER_SIZE: u8 = 5; + +/// Invalid batches are attempted to be re-downloaded from other peers. If they cannot be processed +/// after `INVALID_BATCH_LOOKUP_ATTEMPTS` times, the chain is considered faulty and all peers will +/// be downvoted. +const _INVALID_BATCH_LOOKUP_ATTEMPTS: u8 = 3; + /// A return type for functions that act on a `Chain` which informs the caller whether the chain /// has been completed and should be removed or to be kept if further processing is /// required. @@ -31,32 +38,6 @@ pub enum ProcessingResult { RemoveChain, } -impl Eq for Batch {} - -impl Batch { - fn new(id: u64, start_slot: Slot, end_slot: Slot, head_root: Hash256, peer_id: PeerId) -> Self { - Batch { - id, - start_slot, - end_slot, - head_root, - _original_peer: peer_id.clone(), - current_peer: peer_id, - retries: 0, - downloaded_blocks: Vec::new(), - } - } - - fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { - BlocksByRangeRequest { - head_block_root: self.head_root, - start_slot: self.start_slot.into(), - count: std::cmp::min(BLOCKS_PER_BATCH, self.end_slot.sub(self.start_slot).into()), - step: 1, - } - } -} - /// A chain of blocks that need to be downloaded. Peers who claim to contain the target head /// root are grouped into the peer pool and queried for batches when downloading the /// chain. @@ -77,21 +58,37 @@ pub struct SyncingChain { /// The batches that have been downloaded and are awaiting processing and/or validation. completed_batches: Vec>, + /// Batches that have been processed and awaiting validation before being removed. + processed_batches: Vec>, + /// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain /// and thus available to download this chain from. pub peer_pool: HashSet, /// The next batch_id that needs to be downloaded. - to_be_downloaded_id: u64, + to_be_downloaded_id: BatchId, /// The next batch id that needs to be processed. - to_be_processed_id: u64, + to_be_processed_id: BatchId, /// The last batch id that was processed. - last_processed_id: u64, + last_processed_id: BatchId, /// The current state of the chain. pub state: ChainSyncingState, + + /// A random id given to a batch process request. This is None if there is no ongoing batch + /// process. + current_processing_id: Option, + + /// A send channel to the sync manager. This is given to the batch processor thread to report + /// back once batch processing has completed. + sync_send: mpsc::UnboundedSender>, + + chain: Weak>, + + /// A reference to the sync logger. + log: slog::Logger, } #[derive(PartialEq)] @@ -100,8 +97,6 @@ pub enum ChainSyncingState { Stopped, /// The chain is undergoing syncing. Syncing, - /// The chain is temporarily paused whilst an error is rectified. - _Paused, } impl SyncingChain { @@ -110,6 +105,9 @@ impl SyncingChain { target_head_slot: Slot, target_head_root: Hash256, peer_id: PeerId, + sync_send: mpsc::UnboundedSender>, + chain: Weak>, + log: slog::Logger, ) -> Self { let mut peer_pool = HashSet::new(); peer_pool.insert(peer_id); @@ -120,11 +118,16 @@ impl SyncingChain { target_head_root, pending_batches: PendingBatches::new(), completed_batches: Vec::new(), + processed_batches: Vec::new(), peer_pool, - to_be_downloaded_id: 1, - to_be_processed_id: 1, - last_processed_id: 0, + to_be_downloaded_id: BatchId(1), + to_be_processed_id: BatchId(1), + last_processed_id: BatchId(0), state: ChainSyncingState::Stopped, + current_processing_id: None, + sync_send, + chain, + log, } } @@ -136,49 +139,45 @@ impl SyncingChain { /// batch. pub fn on_block_response( &mut self, - chain: &Weak>, network: &mut SyncNetworkContext, request_id: RequestId, beacon_block: &Option>, - log: &slog::Logger, - ) -> Option { + ) -> Option<()> { if let Some(block) = beacon_block { // This is not a stream termination, simply add the block to the request - self.pending_batches.add_block(request_id, block.clone())?; - Some(ProcessingResult::KeepChain) + self.pending_batches.add_block(request_id, block.clone()) } else { // A stream termination has been sent. This batch has ended. Process a completed batch. let batch = self.pending_batches.remove(request_id)?; - Some(self.process_completed_batch(chain.clone(), network, batch, log)) + self.handle_completed_batch(network, batch); + Some(()) } } /// A completed batch has been received, process the batch. /// This will return `ProcessingResult::KeepChain` if the chain has not completed or /// failed indicating that further batches are required. - fn process_completed_batch( + fn handle_completed_batch( &mut self, - chain: Weak>, network: &mut SyncNetworkContext, batch: Batch, - log: &slog::Logger, - ) -> ProcessingResult { + ) { // An entire batch of blocks has been received. This functions checks to see if it can be processed, // remove any batches waiting to be verified and if this chain is syncing, request new // blocks for the peer. - debug!(log, "Completed batch received"; "id"=>batch.id, "blocks"=>batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len()); + debug!(self.log, "Completed batch received"; "id"=> *batch.id, "blocks"=>batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len()); // verify the range of received blocks // Note that the order of blocks is verified in block processing if let Some(last_slot) = batch.downloaded_blocks.last().map(|b| b.slot) { // the batch is non-empty if batch.start_slot > batch.downloaded_blocks[0].slot || batch.end_slot < last_slot { - warn!(log, "BlocksByRange response returned out of range blocks"; + warn!(self.log, "BlocksByRange response returned out of range blocks"; "response_initial_slot" => batch.downloaded_blocks[0].slot, "requested_initial_slot" => batch.start_slot); network.downvote_peer(batch.current_peer); self.to_be_processed_id = batch.id; // reset the id back to here, when incrementing, it will check against completed batches - return ProcessingResult::KeepChain; + return; } } @@ -200,138 +199,138 @@ impl SyncingChain { // already be processed but not verified and therefore have Id's less than // `self.to_be_processed_id`. - //TODO: Run the processing of blocks in a separate thread. Build a queue of completed - //blocks here, manage the queue and process them in another thread as they become - //available. + // pre-emptively request more blocks from peers whilst we process current blocks, + self.request_batches(network); - if self.state == ChainSyncingState::Syncing { - // pre-emptively request more blocks from peers whilst we process current blocks, - if !self.send_range_request(network, log) { - debug!(log, "No peer available for next batch.") - } + // Try and process any completed batches. This will spawn a new task to process any blocks + // that are ready to be processed. + self.process_completed_batches(); + } + + /// Tries to process any batches if there are any available and we are not currently processing + /// other batches. + fn process_completed_batches(&mut self) { + // Only process batches if this chain is Syncing + if self.state != ChainSyncingState::Syncing { + return; } - // Try and process batches sequentially in the ordered list. - let current_process_id = self.to_be_processed_id; - // keep track of the number of successful batches to decide whether to run fork choice - let mut successful_block_process = false; + // Only process one batch at a time + if self.current_processing_id.is_some() { + return; + } - for batch in self - .completed_batches - .iter() - .filter(|batch| batch.id >= current_process_id) + // Check if there is a batch ready to be processed + while !self.completed_batches.is_empty() + && self.completed_batches[0].id == self.to_be_processed_id { - if batch.id != self.to_be_processed_id { - // there are no batches to be processed at the moment - break; - } - + let batch = self.completed_batches.remove(0); if batch.downloaded_blocks.is_empty() { - // the batch was empty, progress to the next block - self.to_be_processed_id += 1; + // The batch was empty, consider this processed and move to the next batch + self.processed_batches.push(batch); + *self.to_be_processed_id += 1; continue; } - // process the batch - // Keep track of successful batches. Run fork choice after all waiting batches have - // been processed. - debug!(log, "Processing batch"; "batch_id" => batch.id); - match process_batch(chain.clone(), batch, log) { - Ok(_) => { - // batch was successfully processed - self.last_processed_id = self.to_be_processed_id; - self.to_be_processed_id += 1; - successful_block_process = true; - } - Err(e) => { - warn!(log, "Block processing error"; "error"=> format!("{:?}", e)); - - if successful_block_process { - if let Some(chain) = chain.upgrade() { - match chain.fork_choice() { - Ok(()) => trace!( - log, - "Fork choice success"; - "location" => "batch import error" - ), - Err(e) => error!( - log, - "Fork choice failed"; - "error" => format!("{:?}", e), - "location" => "batch import error" - ), - } - } - } - - // batch processing failed - // this could be because this batch is invalid, or a previous invalidated batch - // is invalid. We need to find out which and downvote the peer that has sent us - // an invalid batch. - - // firstly remove any validated batches - return self.handle_invalid_batch(chain, network); - } - } - } - // If we have processed batches, run fork choice - if successful_block_process { - if let Some(chain) = chain.upgrade() { - match chain.fork_choice() { - Ok(()) => trace!( - log, - "Fork choice success"; - "location" => "batch import success" - ), - Err(e) => error!( - log, - "Fork choice failed"; - "error" => format!("{:?}", e), - "location" => "batch import success" - ), - } - } - } - - // remove any validated batches - let last_processed_id = self.last_processed_id; - self.completed_batches - .retain(|batch| batch.id >= last_processed_id); - - // check if the chain has completed syncing - if self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH >= self.target_head_slot { - // chain is completed - ProcessingResult::RemoveChain - } else { - // chain is not completed - ProcessingResult::KeepChain + // send the batch to the batch processor thread + return self.process_batch(batch); } } - /// An invalid batch has been received that could not be processed. - fn handle_invalid_batch( + /// Sends a batch to the batch processor. + fn process_batch(&mut self, batch: Batch) { + // only spawn one instance at a time + let processing_id: u64 = rand::random(); + self.current_processing_id = Some(processing_id); + spawn_batch_processor( + self.chain.clone(), + processing_id, + batch, + self.sync_send.clone(), + self.log.clone(), + ); + } + + /// The block processor has completed processing a batch. This function handles the result + /// of the batch processor. + pub fn on_batch_process_result( &mut self, - _chain: Weak>, network: &mut SyncNetworkContext, - ) -> ProcessingResult { - // The current batch could not be processed, indicating either the current or previous - // batches are invalid - - // The previous batch could be - // incomplete due to the block sizes being too large to fit in a single RPC - // request or there could be consecutive empty batches which are not supposed to be there - - // Address these two cases individually. - // Firstly, check if the past batch is invalid. - // - - //TODO: Implement this logic - // Currently just fail the chain, and drop all associated peers, removing them from the - // peer pool, to prevent re-status - for peer_id in self.peer_pool.drain() { - network.downvote_peer(peer_id); + processing_id: u64, + batch: &mut Option>, + result: &BatchProcessResult, + ) -> Option { + if Some(processing_id) != self.current_processing_id { + // batch process doesn't belong to this chain + return None; } - ProcessingResult::RemoveChain + + // Consume the batch option + let batch = batch.take().or_else(|| { + crit!(self.log, "Processed batch taken by another chain"); + None + })?; + + // double check batches are processed in order TODO: Remove for prod + if batch.id != self.to_be_processed_id { + crit!(self.log, "Batch processed out of order"; + "processed_batch_id" => *batch.id, + "expected_id" => *self.to_be_processed_id); + } + + self.current_processing_id = None; + + let res = match result { + BatchProcessResult::Success => { + *self.to_be_processed_id += 1; + // This variable accounts for skip slots and batches that were not actually + // processed due to having no blocks. + self.last_processed_id = batch.id; + + // Remove any validate batches awaiting validation. + // Only batches that have blocks are processed here, therefore all previous batches + // have been correct. + let last_processed_id = self.last_processed_id; + self.processed_batches + .retain(|batch| batch.id.0 >= last_processed_id.0); + + // add the current batch to processed batches to be verified in the future. We are + // only uncertain about this batch, if it has not returned all blocks. + if batch.downloaded_blocks.len() < BLOCKS_PER_BATCH as usize { + self.processed_batches.push(batch); + } + + // check if the chain has completed syncing + if self.start_slot + *self.last_processed_id * BLOCKS_PER_BATCH + >= self.target_head_slot + { + // chain is completed + ProcessingResult::RemoveChain + } else { + // chain is not completed + + // attempt to request more batches + self.request_batches(network); + + // attempt to process more batches + self.process_completed_batches(); + + // keep the chain + ProcessingResult::KeepChain + } + } + BatchProcessResult::Failed => { + // batch processing failed + // this could be because this batch is invalid, or a previous invalidated batch + // is invalid. We need to find out which and downvote the peer that has sent us + // an invalid batch. + + // firstly remove any validated batches + self.handle_invalid_batch(network, batch) + } + }; + + Some(res) } pub fn stop_syncing(&mut self) { @@ -342,154 +341,66 @@ impl SyncingChain { /// This chain has been requested to start syncing. /// /// This could be new chain, or an old chain that is being resumed. - pub fn start_syncing( - &mut self, - network: &mut SyncNetworkContext, - local_finalized_slot: Slot, - log: &slog::Logger, - ) { + pub fn start_syncing(&mut self, network: &mut SyncNetworkContext, local_finalized_slot: Slot) { // A local finalized slot is provided as other chains may have made // progress whilst this chain was Stopped or paused. If so, update the `processed_batch_id` to // accommodate potentially downloaded batches from other chains. Also prune any old batches // awaiting processing - // Only important if the local head is more than a batch worth of blocks ahead of - // what this chain believes is downloaded - let batches_ahead = local_finalized_slot - .as_u64() - .saturating_sub(self.start_slot.as_u64() + self.last_processed_id * BLOCKS_PER_BATCH) - / BLOCKS_PER_BATCH; + // If the local finalized epoch is ahead of our current processed chain, update the chain + // to start from this point and re-index all subsequent batches starting from one + // (effectively creating a new chain). - if batches_ahead != 0 { - // there are `batches_ahead` whole batches that have been downloaded by another - // chain. Set the current processed_batch_id to this value. - debug!(log, "Updating chains processed batches"; "old_completed_slot" => self.start_slot + self.last_processed_id*BLOCKS_PER_BATCH, "new_completed_slot" => self.start_slot + (self.last_processed_id + batches_ahead)*BLOCKS_PER_BATCH); - self.last_processed_id += batches_ahead; + if local_finalized_slot.as_u64() + > self + .start_slot + .as_u64() + .saturating_add(*self.last_processed_id * BLOCKS_PER_BATCH) + { + debug!(self.log, "Updating chain's progress"; + "prev_completed_slot" => self.start_slot + *self.last_processed_id*BLOCKS_PER_BATCH, + "new_completed_slot" => local_finalized_slot.as_u64()); + // Re-index batches + *self.last_processed_id = 0; + *self.to_be_downloaded_id = 1; + *self.to_be_processed_id = 1; - if self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH - > self.target_head_slot.as_u64() - { - crit!( - log, - "Current head slot is above the target head"; - "target_head_slot" => self.target_head_slot.as_u64(), - "new_start" => self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH, - ); - return; - } - - // update the `to_be_downloaded_id` - if self.to_be_downloaded_id < self.last_processed_id { - self.to_be_downloaded_id = self.last_processed_id; - } - - let last_processed_id = self.last_processed_id; - self.completed_batches - .retain(|batch| batch.id >= last_processed_id.saturating_sub(1)); + // remove any completed or processed batches + self.completed_batches.clear(); + self.processed_batches.clear(); } - // Now begin requesting blocks from the peer pool, until all peers are exhausted. - while self.send_range_request(network, log) {} - self.state = ChainSyncingState::Syncing; + + // start processing batches if needed + self.process_completed_batches(); + + // begin requesting blocks from the peer pool, until all peers are exhausted. + self.request_batches(network); } /// Add a peer to the chain. /// /// If the chain is active, this starts requesting batches from this peer. - pub fn add_peer( - &mut self, - network: &mut SyncNetworkContext, - peer_id: PeerId, - log: &slog::Logger, - ) { + pub fn add_peer(&mut self, network: &mut SyncNetworkContext, peer_id: PeerId) { self.peer_pool.insert(peer_id.clone()); // do not request blocks if the chain is not syncing if let ChainSyncingState::Stopped = self.state { - debug!(log, "Peer added to a non-syncing chain"; "peer_id" => format!("{:?}", peer_id)); + debug!(self.log, "Peer added to a non-syncing chain"; "peer_id" => format!("{:?}", peer_id)); return; } - // find the next batch and request it from the peer - self.send_range_request(network, log); + // find the next batch and request it from any peers if we need to + self.request_batches(network); } /// Sends a STATUS message to all peers in the peer pool. - pub fn status_peers(&self, chain: Weak>, network: &mut SyncNetworkContext) { + pub fn status_peers(&self, network: &mut SyncNetworkContext) { for peer_id in self.peer_pool.iter() { - network.status_peer(chain.clone(), peer_id.clone()); + network.status_peer(self.chain.clone(), peer_id.clone()); } } - /// Requests the next required batch from a peer. Returns true, if there was a peer available - /// to send a request and there are batches to request, false otherwise. - fn send_range_request(&mut self, network: &mut SyncNetworkContext, log: &slog::Logger) -> bool { - // find the next pending batch and request it from the peer - if let Some(peer_id) = self.get_next_peer() { - if let Some(batch) = self.get_next_batch(peer_id) { - debug!(log, "Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root)); - // send the batch - self.send_batch(network, batch); - return true; - } - } - false - } - - /// Returns a peer if there exists a peer which does not currently have a pending request. - /// - /// This is used to create the next request. - fn get_next_peer(&self) -> Option { - for peer in self.peer_pool.iter() { - if self.pending_batches.peer_is_idle(peer) { - return Some(peer.clone()); - } - } - None - } - - /// Requests the provided batch from the provided peer. - fn send_batch(&mut self, network: &mut SyncNetworkContext, batch: Batch) { - let request = batch.to_blocks_by_range_request(); - if let Ok(request_id) = network.blocks_by_range_request(batch.current_peer.clone(), request) - { - // add the batch to pending list - self.pending_batches.insert(request_id, batch); - } - } - - /// Returns the next required batch from the chain if it exists. If there are no more batches - /// required, `None` is returned. - fn get_next_batch(&mut self, peer_id: PeerId) -> Option> { - let batch_start_slot = - self.start_slot + self.to_be_downloaded_id.saturating_sub(1) * BLOCKS_PER_BATCH; - if batch_start_slot > self.target_head_slot { - return None; - } - let batch_end_slot = std::cmp::min( - batch_start_slot + BLOCKS_PER_BATCH, - self.target_head_slot.saturating_add(1u64), - ); - - let batch_id = self.to_be_downloaded_id; - // find the next batch id. The largest of the next sequential idea, of the next uncompleted - // id - let max_completed_id = - self.completed_batches - .iter() - .fold(0, |max, batch| if batch.id > max { batch.id } else { max }); - self.to_be_downloaded_id = - std::cmp::max(self.to_be_downloaded_id + 1, max_completed_id + 1); - - Some(Batch::new( - batch_id, - batch_start_slot, - batch_end_slot, - self.target_head_root, - peer_id, - )) - } - /// An RPC error has occurred. /// /// Checks if the request_id is associated with this chain. If so, attempts to re-request the @@ -501,18 +412,21 @@ impl SyncingChain { network: &mut SyncNetworkContext, peer_id: &PeerId, request_id: RequestId, - log: &slog::Logger, ) -> Option { if let Some(batch) = self.pending_batches.remove(request_id) { - warn!(log, "Batch failed. RPC Error"; "id" => batch.id, "retries" => batch.retries, "peer" => format!("{:?}", peer_id)); + warn!(self.log, "Batch failed. RPC Error"; + "id" => *batch.id, + "retries" => batch.retries, + "peer" => format!("{:?}", peer_id)); - Some(self.failed_batch(network, batch, log)) + Some(self.failed_batch(network, batch)) } else { None } } - /// A batch has failed. + /// A batch has failed. This occurs when a network timeout happens or the peer didn't respond. + /// These events do not indicate a malicious peer, more likely simple networking issues. /// /// Attempts to re-request from another peer in the peer pool (if possible) and returns /// `ProcessingResult::RemoveChain` if the number of retries on the batch exceeds @@ -521,7 +435,6 @@ impl SyncingChain { &mut self, network: &mut SyncNetworkContext, mut batch: Batch, - log: &Logger, ) -> ProcessingResult { batch.retries += 1; @@ -541,116 +454,152 @@ impl SyncingChain { .unwrap_or_else(|| current_peer); batch.current_peer = new_peer.clone(); - debug!(log, "Re-Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root)); + debug!(self.log, "Re-Requesting batch"; + "start_slot" => batch.start_slot, + "end_slot" => batch.end_slot, + "id" => *batch.id, + "peer" => format!("{:?}", batch.current_peer), + "head_root"=> format!("{}", batch.head_root)); self.send_batch(network, batch); ProcessingResult::KeepChain } } -} -// Helper function to process block batches which only consumes the chain and blocks to process -fn process_batch( - chain: Weak>, - batch: &Batch, - log: &Logger, -) -> Result<(), String> { - for block in &batch.downloaded_blocks { - if let Some(chain) = chain.upgrade() { - let processing_result = chain.process_block(block.clone()); + /// An invalid batch has been received that could not be processed. + /// + /// These events occur when a peer as successfully responded with blocks, but the blocks we + /// have received are incorrect or invalid. This indicates the peer has not performed as + /// intended and can result in downvoting a peer. + fn handle_invalid_batch( + &mut self, + network: &mut SyncNetworkContext, + _batch: Batch, + ) -> ProcessingResult { + // The current batch could not be processed, indicating either the current or previous + // batches are invalid - if let Ok(outcome) = processing_result { - match outcome { - BlockProcessingOutcome::Processed { block_root } => { - // The block was valid and we processed it successfully. - trace!( - log, "Imported block from network"; - "slot" => block.slot, - "block_root" => format!("{}", block_root), - ); - } - BlockProcessingOutcome::ParentUnknown { parent } => { - // blocks should be sequential and all parents should exist - trace!( - log, "Parent block is unknown"; - "parent_root" => format!("{}", parent), - "baby_block_slot" => block.slot, - ); - return Err(format!( - "Block at slot {} has an unknown parent.", - block.slot - )); - } - BlockProcessingOutcome::BlockIsAlreadyKnown => { - // this block is already known to us, move to the next - debug!( - log, "Imported a block that is already known"; - "block_slot" => block.slot, - ); - } - BlockProcessingOutcome::FutureSlot { - present_slot, - block_slot, - } => { - if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { - // The block is too far in the future, drop it. - trace!( - log, "Block is ahead of our slot clock"; - "msg" => "block for future slot rejected, check your time", - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - ); - return Err(format!( - "Block at slot {} is too far in the future", - block.slot - )); - } else { - // The block is in the future, but not too far. - trace!( - log, "Block is slightly ahead of our slot clock, ignoring."; - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - ); - } - } - BlockProcessingOutcome::WouldRevertFinalizedSlot { .. } => { - trace!( - log, "Finalized or earlier block processed"; - "outcome" => format!("{:?}", outcome), - ); - // block reached our finalized slot or was earlier, move to the next block - } - BlockProcessingOutcome::GenesisBlock => { - trace!( - log, "Genesis block was processed"; - "outcome" => format!("{:?}", outcome), - ); - } - _ => { - warn!( - log, "Invalid block received"; - "msg" => "peer sent invalid block", - "outcome" => format!("{:?}", outcome), - ); - return Err(format!("Invalid block at slot {}", block.slot)); - } - } - } else { - warn!( - log, "BlockProcessingFailure"; - "msg" => "unexpected condition in processing block.", - "outcome" => format!("{:?}", processing_result) - ); - return Err(format!( - "Unexpected block processing error: {:?}", - processing_result - )); - } - } else { - return Ok(()); // terminate early due to dropped beacon chain + // The previous batch could be incomplete due to the block sizes being too large to fit in + // a single RPC request or there could be consecutive empty batches which are not supposed + // to be there + + // The current (sub-optimal) strategy is to simply re-request all batches that could + // potentially be faulty. If a batch returns a different result than the original and + // results in successful processing, we downvote the original peer that sent us the batch. + + // If all batches return the same result, we try this process INVALID_BATCH_LOOKUP_ATTEMPTS + // times before considering the entire chain invalid and downvoting all peers. + + // Firstly, check if there are any past batches that could be invalid. + if !self.processed_batches.is_empty() { + // try and re-download this batch from other peers + } + + //TODO: Implement this logic + // Currently just fail the chain, and drop all associated peers, removing them from the + // peer pool, to prevent re-status + for peer_id in self.peer_pool.drain() { + network.downvote_peer(peer_id); + } + ProcessingResult::RemoveChain + } + + /// Attempts to request the next required batches from the peer pool if the chain is syncing. + /// It will exhaust the peer pool and left over batches until the batch buffer is reached or + /// all peers are exhausted. + fn request_batches(&mut self, network: &mut SyncNetworkContext) { + if let ChainSyncingState::Syncing = self.state { + while self.send_range_request(network) {} } } - Ok(()) + /// Requests the next required batch from a peer. Returns true, if there was a peer available + /// to send a request and there are batches to request, false otherwise. + fn send_range_request(&mut self, network: &mut SyncNetworkContext) -> bool { + // find the next pending batch and request it from the peer + if let Some(peer_id) = self.get_next_peer() { + if let Some(batch) = self.get_next_batch(peer_id) { + debug!(self.log, "Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => *batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root)); + // send the batch + self.send_batch(network, batch); + return true; + } + } + false + } + + /// Returns a peer if there exists a peer which does not currently have a pending request. + /// + /// This is used to create the next request. + fn get_next_peer(&self) -> Option { + // randomize the peers for load balancing + let mut rng = rand::thread_rng(); + let mut peers = self.peer_pool.iter().collect::>(); + peers.shuffle(&mut rng); + for peer in peers { + if self.pending_batches.peer_is_idle(peer) { + return Some(peer.clone()); + } + } + None + } + + /// Returns the next required batch from the chain if it exists. If there are no more batches + /// required, `None` is returned. + fn get_next_batch(&mut self, peer_id: PeerId) -> Option> { + // only request batches up to the buffer size limit + if self + .completed_batches + .len() + .saturating_add(self.pending_batches.len()) + > BATCH_BUFFER_SIZE as usize + { + return None; + } + + // don't request batches beyond the target head slot + let batch_start_slot = + self.start_slot + self.to_be_downloaded_id.saturating_sub(1) * BLOCKS_PER_BATCH; + if batch_start_slot > self.target_head_slot { + return None; + } + // truncate the batch to the target head of the chain + let batch_end_slot = std::cmp::min( + batch_start_slot + BLOCKS_PER_BATCH, + self.target_head_slot.saturating_add(1u64), + ); + + let batch_id = self.to_be_downloaded_id; + + // Find the next batch id. The largest of the next sequential id, or the next uncompleted + // id + let max_completed_id = self + .completed_batches + .iter() + .last() + .map(|x| x.id.0) + .unwrap_or_else(|| 0); + // TODO: Check if this is necessary + self.to_be_downloaded_id = BatchId(std::cmp::max( + self.to_be_downloaded_id.0 + 1, + max_completed_id + 1, + )); + + Some(Batch::new( + batch_id, + batch_start_slot, + batch_end_slot, + self.target_head_root, + peer_id, + )) + } + + /// Requests the provided batch from the provided peer. + fn send_batch(&mut self, network: &mut SyncNetworkContext, batch: Batch) { + let request = batch.to_blocks_by_range_request(); + if let Ok(request_id) = network.blocks_by_range_request(batch.current_peer.clone(), request) + { + // add the batch to pending list + self.pending_batches.insert(request_id, batch); + } + } } diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 68c0c9a26..a42589e9d 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -3,13 +3,15 @@ //! Each chain type is stored in it's own vector. A variety of helper functions are given along //! with this struct to to simplify the logic of the other layers of sync. -use super::chain::{ChainSyncingState, ProcessingResult, SyncingChain}; +use super::chain::{ChainSyncingState, SyncingChain}; use crate::message_processor::PeerSyncInfo; +use crate::sync::manager::SyncMessage; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PeerId; use slog::{debug, error, warn}; use std::sync::Weak; +use tokio::sync::mpsc; use types::EthSpec; use types::{Hash256, Slot}; @@ -148,7 +150,7 @@ impl ChainCollection { // Stop the current chain from syncing self.finalized_chains[index].stop_syncing(); // Start the new chain - self.finalized_chains[new_index].start_syncing(network, local_slot, log); + self.finalized_chains[new_index].start_syncing(network, local_slot); self.sync_state = SyncState::Finalized; } } else if let Some(chain) = self @@ -158,7 +160,7 @@ impl ChainCollection { { // There is no currently syncing finalization chain, starting the one with the most peers debug!(log, "New finalized chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot); - chain.start_syncing(network, local_slot, log); + chain.start_syncing(network, local_slot); self.sync_state = SyncState::Finalized; } else { // There are no finalized chains, update the state. @@ -177,16 +179,22 @@ impl ChainCollection { target_head: Hash256, target_slot: Slot, peer_id: PeerId, + sync_send: mpsc::UnboundedSender>, + log: &slog::Logger, ) { self.finalized_chains.push(SyncingChain::new( local_finalized_slot, target_slot, target_head, peer_id, + sync_send, + self.beacon_chain.clone(), + log.clone(), )); } /// Add a new finalized chain to the collection and starts syncing it. + #[allow(clippy::too_many_arguments)] pub fn new_head_chain( &mut self, network: &mut SyncNetworkContext, @@ -194,6 +202,7 @@ impl ChainCollection { target_head: Hash256, target_slot: Slot, peer_id: PeerId, + sync_send: mpsc::UnboundedSender>, log: &slog::Logger, ) { // remove the peer from any other head chains @@ -203,10 +212,17 @@ impl ChainCollection { }); self.head_chains.retain(|chain| !chain.peer_pool.is_empty()); - let mut new_head_chain = - SyncingChain::new(remote_finalized_slot, target_slot, target_head, peer_id); + let mut new_head_chain = SyncingChain::new( + remote_finalized_slot, + target_slot, + target_head, + peer_id, + sync_send, + self.beacon_chain.clone(), + log.clone(), + ); // All head chains can sync simultaneously - new_head_chain.start_syncing(network, remote_finalized_slot, log); + new_head_chain.start_syncing(network, remote_finalized_slot); self.head_chains.push(new_head_chain); } @@ -218,10 +234,10 @@ impl ChainCollection { /// Given a chain iterator, runs a given function on each chain until the function returns /// `Some`. This allows the `RangeSync` struct to loop over chains and optionally remove the /// chain from the collection if the function results in completing the chain. - fn request_function<'a, F, I>(chain: I, mut func: F) -> Option<(usize, ProcessingResult)> + fn request_function<'a, F, I, U>(chain: I, mut func: F) -> Option<(usize, U)> where I: Iterator>, - F: FnMut(&'a mut SyncingChain) -> Option, + F: FnMut(&'a mut SyncingChain) -> Option, { chain .enumerate() @@ -229,25 +245,25 @@ impl ChainCollection { } /// Runs a function on all finalized chains. - pub fn finalized_request(&mut self, func: F) -> Option<(usize, ProcessingResult)> + pub fn finalized_request(&mut self, func: F) -> Option<(usize, U)> where - F: FnMut(&mut SyncingChain) -> Option, + F: FnMut(&mut SyncingChain) -> Option, { ChainCollection::request_function(self.finalized_chains.iter_mut(), func) } /// Runs a function on all head chains. - pub fn head_request(&mut self, func: F) -> Option<(usize, ProcessingResult)> + pub fn head_request(&mut self, func: F) -> Option<(usize, U)> where - F: FnMut(&mut SyncingChain) -> Option, + F: FnMut(&mut SyncingChain) -> Option, { ChainCollection::request_function(self.head_chains.iter_mut(), func) } /// Runs a function on all finalized and head chains. - pub fn head_finalized_request(&mut self, func: F) -> Option<(usize, ProcessingResult)> + pub fn head_finalized_request(&mut self, func: F) -> Option<(usize, U)> where - F: FnMut(&mut SyncingChain) -> Option, + F: FnMut(&mut SyncingChain) -> Option, { ChainCollection::request_function( self.finalized_chains @@ -267,9 +283,9 @@ impl ChainCollection { .retain(|chain| !chain.peer_pool.is_empty()); self.head_chains.retain(|chain| !chain.peer_pool.is_empty()); - let local_info = match self.beacon_chain.upgrade() { + let (beacon_chain, local_info) = match self.beacon_chain.upgrade() { Some(chain) => match PeerSyncInfo::from_chain(&chain) { - Some(local) => local, + Some(local) => (chain, local), None => { return error!( log, @@ -288,18 +304,25 @@ impl ChainCollection { .start_slot(T::EthSpec::slots_per_epoch()); // Remove chains that are out-dated and re-status their peers - let beacon_chain_clone = self.beacon_chain.clone(); self.finalized_chains.retain(|chain| { - if chain.target_head_slot <= local_finalized_slot { - chain.status_peers(beacon_chain_clone.clone(), network); + if chain.target_head_slot <= local_finalized_slot + || beacon_chain + .block_root_tree + .is_known_block_root(&chain.target_head_root) + { + chain.status_peers(network); false } else { true } }); self.head_chains.retain(|chain| { - if chain.target_head_slot <= local_finalized_slot { - chain.status_peers(beacon_chain_clone.clone(), network); + if chain.target_head_slot <= local_finalized_slot + || beacon_chain + .block_root_tree + .is_known_block_root(&chain.target_head_root) + { + chain.status_peers(network); false } else { true @@ -331,11 +354,11 @@ impl ChainCollection { let chain = if index >= self.finalized_chains.len() { let index = index - self.finalized_chains.len(); let chain = self.head_chains.swap_remove(index); - chain.status_peers(self.beacon_chain.clone(), network); + chain.status_peers(network); chain } else { let chain = self.finalized_chains.swap_remove(index); - chain.status_peers(self.beacon_chain.clone(), network); + chain.status_peers(network); chain }; diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 601840b8f..28cff24e3 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -2,8 +2,11 @@ //! peers. mod batch; +mod batch_processing; mod chain; mod chain_collection; mod range; +pub use batch::Batch; +pub use batch_processing::BatchProcessResult; pub use range::RangeSync; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 98ef6e621..60b9ea18b 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -41,7 +41,9 @@ use super::chain::ProcessingResult; use super::chain_collection::{ChainCollection, SyncState}; +use super::{Batch, BatchProcessResult}; use crate::message_processor::PeerSyncInfo; +use crate::sync::manager::SyncMessage; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::RequestId; @@ -49,6 +51,7 @@ use eth2_libp2p::PeerId; use slog::{debug, error, trace, warn}; use std::collections::HashSet; use std::sync::Weak; +use tokio::sync::mpsc; use types::{BeaconBlock, EthSpec}; /// The primary object dealing with long range/batch syncing. This contains all the active and @@ -64,16 +67,24 @@ pub struct RangeSync { /// finalized chain(s) complete, these peer's get STATUS'ed to update their head slot before /// the head chains are formed and downloaded. awaiting_head_peers: HashSet, + /// The sync manager channel, allowing the batch processor thread to callback the sync task + /// once complete. + sync_send: mpsc::UnboundedSender>, /// The syncing logger. log: slog::Logger, } impl RangeSync { - pub fn new(beacon_chain: Weak>, log: slog::Logger) -> Self { + pub fn new( + beacon_chain: Weak>, + sync_send: mpsc::UnboundedSender>, + log: slog::Logger, + ) -> Self { RangeSync { beacon_chain: beacon_chain.clone(), chains: ChainCollection::new(beacon_chain), awaiting_head_peers: HashSet::new(), + sync_send, log, } } @@ -105,9 +116,10 @@ impl RangeSync { // determine if we need to run a sync to the nearest finalized state or simply sync to // its current head - let local_info = match self.beacon_chain.upgrade() { + + let (chain, local_info) = match self.beacon_chain.upgrade() { Some(chain) => match PeerSyncInfo::from_chain(&chain) { - Some(local) => local, + Some(local) => (chain, local), None => { return error!( self.log, @@ -117,10 +129,9 @@ impl RangeSync { } }, None => { - warn!(self.log, + return warn!(self.log, "Beacon chain dropped. Peer not considered for sync"; "peer_id" => format!("{:?}", peer_id)); - return; } }; @@ -138,7 +149,11 @@ impl RangeSync { // remove any out-of-date chains self.chains.purge_outdated_chains(network, &self.log); - if remote_finalized_slot > local_info.head_slot { + if remote_finalized_slot > local_info.head_slot + && !chain + .block_root_tree + .is_known_block_root(&remote.finalized_root) + { debug!(self.log, "Finalization sync peer joined"; "peer_id" => format!("{:?}", peer_id)); // Finalized chain search @@ -154,7 +169,7 @@ impl RangeSync { debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => format!("{:?}", peer_id), "target_root" => format!("{}", chain.target_head_root), "end_slot" => chain.target_head_slot, "start_slot"=> chain.start_slot); // add the peer to the chain's peer pool - chain.add_peer(network, peer_id, &self.log); + chain.add_peer(network, peer_id); // check if the new peer's addition will favour a new syncing chain. self.chains.update_finalized(network, &self.log); @@ -168,6 +183,8 @@ impl RangeSync { remote.finalized_root, remote_finalized_slot, peer_id, + self.sync_send.clone(), + &self.log, ); self.chains.update_finalized(network, &self.log); } @@ -188,7 +205,7 @@ impl RangeSync { debug!(self.log, "Adding peer to the existing head chain peer pool"; "head_root" => format!("{}",remote.head_root), "head_slot" => remote.head_slot, "peer_id" => format!("{:?}", peer_id)); // add the peer to the head's pool - chain.add_peer(network, peer_id, &self.log); + chain.add_peer(network, peer_id); } else { // There are no other head chains that match this peer's status, create a new one, and let start_slot = std::cmp::min(local_info.head_slot, remote_finalized_slot); @@ -199,6 +216,7 @@ impl RangeSync { remote.head_root, remote.head_slot, peer_id, + self.sync_send.clone(), &self.log, ); } @@ -223,17 +241,37 @@ impl RangeSync { // lookup should not be very expensive. However, we could add an extra index that maps the // request id to index of the vector to avoid O(N) searches and O(N) hash lookups. - let chain_ref = &self.beacon_chain; - let log_ref = &self.log; + let id_not_found = self + .chains + .head_finalized_request(|chain| { + chain.on_block_response(network, request_id, &beacon_block) + }) + .is_none(); + if id_not_found { + // The request didn't exist in any `SyncingChain`. Could have been an old request. Log + // and ignore + debug!(self.log, "Range response without matching request"; "peer" => format!("{:?}", peer_id), "request_id" => request_id); + } + } + + pub fn handle_block_process_result( + &mut self, + network: &mut SyncNetworkContext, + processing_id: u64, + batch: Batch, + result: BatchProcessResult, + ) { + // build an option for passing the batch to each chain + let mut batch = Some(batch); + match self.chains.finalized_request(|chain| { - chain.on_block_response(chain_ref, network, request_id, &beacon_block, log_ref) + chain.on_batch_process_result(network, processing_id, &mut batch, &result) }) { - Some((_, ProcessingResult::KeepChain)) => {} // blocks added to the chain Some((index, ProcessingResult::RemoveChain)) => { let chain = self.chains.remove_finalized_chain(index); debug!(self.log, "Finalized chain removed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); // the chain is complete, re-status it's peers - chain.status_peers(self.beacon_chain.clone(), network); + chain.status_peers(network); // update the state of the collection self.chains.update_finalized(network, &self.log); @@ -246,32 +284,30 @@ impl RangeSync { // sync match self.chains.sync_state() { SyncState::Idle | SyncState::Head => { - for peer_id in self.awaiting_head_peers.iter() { - network.status_peer(self.beacon_chain.clone(), peer_id.clone()); + for peer_id in self.awaiting_head_peers.drain() { + network.status_peer(self.beacon_chain.clone(), peer_id); } } SyncState::Finalized => {} // Have more finalized chains to complete } } + Some((_, ProcessingResult::KeepChain)) => {} None => { - // The request was not in any finalized chain, search head chains match self.chains.head_request(|chain| { - chain.on_block_response(&chain_ref, network, request_id, &beacon_block, log_ref) + chain.on_batch_process_result(network, processing_id, &mut batch, &result) }) { Some((index, ProcessingResult::RemoveChain)) => { let chain = self.chains.remove_head_chain(index); debug!(self.log, "Head chain completed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); // the chain is complete, re-status it's peers and remove it - chain.status_peers(self.beacon_chain.clone(), network); + chain.status_peers(network); // update the state of the collection self.chains.update_finalized(network, &self.log); } - Some(_) => {} + Some((_, ProcessingResult::KeepChain)) => {} None => { - // The request didn't exist in any `SyncingChain`. Could have been an old request. Log - // and ignore - debug!(self.log, "Range response without matching request"; "peer" => format!("{:?}", peer_id), "request_id" => request_id); + warn!(self.log, "No chains match the block processing id"; "id" => processing_id); } } } @@ -304,15 +340,12 @@ impl RangeSync { /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum /// retries. In this case, we need to remove the chain and re-status all the peers. fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { - let log_ref = &self.log; if let Some((index, ProcessingResult::RemoveChain)) = self.chains.head_finalized_request(|chain| { if chain.peer_pool.remove(peer_id) { // this chain contained the peer while let Some(batch) = chain.pending_batches.remove_batch_by_peer(peer_id) { - if let ProcessingResult::RemoveChain = - chain.failed_batch(network, batch, log_ref) - { + if let ProcessingResult::RemoveChain = chain.failed_batch(network, batch) { // a single batch failed, remove the chain return Some(ProcessingResult::RemoveChain); } @@ -341,10 +374,10 @@ impl RangeSync { request_id: RequestId, ) { // check that this request is pending - let log_ref = &self.log; - match self.chains.head_finalized_request(|chain| { - chain.inject_error(network, &peer_id, request_id, log_ref) - }) { + match self + .chains + .head_finalized_request(|chain| chain.inject_error(network, &peer_id, request_id)) + { Some((_, ProcessingResult::KeepChain)) => {} // error handled chain persists Some((index, ProcessingResult::RemoveChain)) => { debug!(self.log, "Chain being removed due to RPC error");