diff --git a/Cargo.lock b/Cargo.lock index 02a470aaa..f8dba7739 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,7 +2,7 @@ # It is not intended for manual editing. [[package]] name = "account_manager" -version = "0.2.3" +version = "0.2.4" dependencies = [ "account_utils", "bls", @@ -373,7 +373,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "0.2.3" +version = "0.2.4" dependencies = [ "beacon_chain", "clap", @@ -530,7 +530,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "0.2.3" +version = "0.2.4" dependencies = [ "clap", "discv5", @@ -2881,7 +2881,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "0.2.3" +version = "0.2.4" dependencies = [ "account_manager", "account_utils", @@ -6003,7 +6003,7 @@ dependencies = [ [[package]] name = "validator_client" -version = "0.2.3" +version = "0.2.4" dependencies = [ "account_utils", "bls", diff --git a/beacon_node/network/src/beacon_processor/chain_segment.rs b/beacon_node/network/src/beacon_processor/chain_segment.rs index 5b034e4aa..8473e9927 100644 --- a/beacon_node/network/src/beacon_processor/chain_segment.rs +++ b/beacon_node/network/src/beacon_processor/chain_segment.rs @@ -1,19 +1,19 @@ use crate::metrics; use crate::router::processor::FUTURE_SLOT_TOLERANCE; use crate::sync::manager::SyncMessage; -use crate::sync::{BatchId, BatchProcessResult, ChainId}; +use crate::sync::{BatchProcessResult, ChainId}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, ChainSegmentResult}; use eth2_libp2p::PeerId; use slog::{debug, error, trace, warn}; use std::sync::Arc; use tokio::sync::mpsc; -use types::{EthSpec, SignedBeaconBlock}; +use types::{Epoch, EthSpec, SignedBeaconBlock}; /// Id associated to a block processing request, either a batch or a single block. #[derive(Clone, Debug, PartialEq)] pub enum ProcessId { /// Processing Id of a range syncing batch. - RangeBatchId(ChainId, BatchId), + RangeBatchId(ChainId, Epoch), /// Processing Id of the parent lookup of a block ParentLookup(PeerId), } @@ -27,7 +27,7 @@ pub fn handle_chain_segment( ) { match process_id { // this a request from the range sync - ProcessId::RangeBatchId(chain_id, batch_id) => { + ProcessId::RangeBatchId(chain_id, epoch) => { let len = downloaded_blocks.len(); let start_slot = if len > 0 { downloaded_blocks[0].message.slot.as_u64() @@ -40,26 +40,26 @@ pub fn handle_chain_segment( 0 }; - debug!(log, "Processing batch"; "id" => *batch_id, "blocks" => downloaded_blocks.len(), "start_slot" => start_slot, "end_slot" => end_slot); + debug!(log, "Processing batch"; "batch_epoch" => epoch, "blocks" => downloaded_blocks.len(), "start_slot" => start_slot, "end_slot" => end_slot); let result = match process_blocks(chain, downloaded_blocks.iter(), &log) { (_, Ok(_)) => { - debug!(log, "Batch processed"; "id" => *batch_id , "start_slot" => start_slot, "end_slot" => end_slot); + debug!(log, "Batch processed"; "batch_epoch" => epoch , "start_slot" => start_slot, "end_slot" => end_slot); BatchProcessResult::Success } (imported_blocks, Err(e)) if imported_blocks > 0 => { debug!(log, "Batch processing failed but imported some blocks"; - "id" => *batch_id, "error" => e, "imported_blocks"=> imported_blocks); + "batch_epoch" => epoch, "error" => e, "imported_blocks"=> imported_blocks); BatchProcessResult::Partial } (_, Err(e)) => { - debug!(log, "Batch processing failed"; "id" => *batch_id, "error" => e); + debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "error" => e); BatchProcessResult::Failed } }; let msg = SyncMessage::BatchProcessed { chain_id, - batch_id, + epoch, downloaded_blocks, result, }; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 2c9cc5add..af55f489b 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,7 +35,7 @@ use super::network_context::SyncNetworkContext; use super::peer_sync_info::{PeerSyncInfo, PeerSyncType}; -use super::range_sync::{BatchId, ChainId, RangeSync, EPOCHS_PER_BATCH}; +use super::range_sync::{ChainId, RangeSync, EPOCHS_PER_BATCH}; use super::RequestId; use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; @@ -51,7 +51,7 @@ use std::boxed::Box; use std::ops::Sub; use std::sync::Arc; use tokio::sync::mpsc; -use types::{EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -100,7 +100,7 @@ pub enum SyncMessage { /// A batch has been processed by the block processor thread. BatchProcessed { chain_id: ChainId, - batch_id: BatchId, + epoch: Epoch, downloaded_blocks: Vec>, result: BatchProcessResult, }, @@ -842,14 +842,14 @@ impl SyncManager { } SyncMessage::BatchProcessed { chain_id, - batch_id, + epoch, downloaded_blocks, result, } => { self.range_sync.handle_block_process_result( &mut self.network, chain_id, - batch_id, + epoch, downloaded_blocks, result, ); diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 0c0bdce31..1d6e4f543 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -8,7 +8,7 @@ mod range_sync; pub use manager::{BatchProcessResult, SyncMessage}; pub use peer_sync_info::PeerSyncInfo; -pub use range_sync::{BatchId, ChainId}; +pub use range_sync::ChainId; /// Type of id of rpc requests sent by sync pub type RequestId = usize; diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 9d65f11f8..1fa312c57 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -9,37 +9,14 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::ops::Sub; -use types::{EthSpec, SignedBeaconBlock, Slot}; - -#[derive(Copy, Clone, Debug, PartialEq)] -pub struct BatchId(pub u64); - -impl std::ops::Deref for BatchId { - type Target = u64; - fn deref(&self) -> &Self::Target { - &self.0 - } -} -impl std::ops::DerefMut for BatchId { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl std::convert::From for BatchId { - fn from(id: u64) -> Self { - BatchId(id) - } -} +use types::{Epoch, EthSpec, SignedBeaconBlock, Slot}; /// A collection of sequential blocks that are requested from peers in a single RPC request. #[derive(PartialEq, Debug)] pub struct Batch { - /// The ID of the batch, these are sequential. - pub id: BatchId, - /// The requested start slot of the batch, inclusive. - pub start_slot: Slot, - /// The requested end slot of batch, exlcusive. + /// The requested start epoch of the batch. + pub start_epoch: Epoch, + /// The requested end slot of batch, exclusive. pub end_slot: Slot, /// The `Attempts` that have been made to send us this batch. pub attempts: Vec, @@ -69,10 +46,9 @@ pub struct Attempt { impl Eq for Batch {} impl Batch { - pub fn new(id: BatchId, start_slot: Slot, end_slot: Slot, peer_id: PeerId) -> Self { + pub fn new(start_epoch: Epoch, end_slot: Slot, peer_id: PeerId) -> Self { Batch { - id, - start_slot, + start_epoch, end_slot, attempts: Vec::new(), current_peer: peer_id, @@ -82,12 +58,21 @@ impl Batch { } } + pub fn start_slot(&self) -> Slot { + // batches are shifted by 1 + self.start_epoch.start_slot(T::slots_per_epoch()) + 1 + } + + pub fn end_slot(&self) -> Slot { + self.end_slot + } pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { + let start_slot = self.start_slot(); BlocksByRangeRequest { - start_slot: self.start_slot.into(), + start_slot: start_slot.into(), count: min( T::slots_per_epoch() * EPOCHS_PER_BATCH, - self.end_slot.sub(self.start_slot).into(), + self.end_slot.sub(start_slot).into(), ), step: 1, } @@ -105,7 +90,7 @@ impl Batch { impl Ord for Batch { fn cmp(&self, other: &Self) -> Ordering { - self.id.0.cmp(&other.id.0) + self.start_epoch.cmp(&other.start_epoch) } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index b86ef5e15..a01e8c3c5 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,4 +1,4 @@ -use super::batch::{Batch, BatchId, PendingBatches}; +use super::batch::{Batch, PendingBatches}; use crate::beacon_processor::ProcessId; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::RequestId; @@ -73,11 +73,12 @@ pub struct SyncingChain { /// and thus available to download this chain from. pub peer_pool: HashSet, - /// The next batch_id that needs to be downloaded. - to_be_downloaded_id: BatchId, + /// Starting epoch of the next batch that needs to be downloaded. + to_be_downloaded: Epoch, - /// The next batch id that needs to be processed. - to_be_processed_id: BatchId, + /// Starting epoch of the batch that needs to be processed next. + /// This is incremented as the chain advances. + processing_target: Epoch, /// The current state of the chain. pub state: ChainSyncingState, @@ -91,7 +92,7 @@ pub struct SyncingChain { /// A reference to the underlying beacon chain. chain: Arc>, - /// A reference to the sync logger. + /// The chain's log. log: slog::Logger, } @@ -127,8 +128,8 @@ impl SyncingChain { completed_batches: Vec::new(), processed_batches: Vec::new(), peer_pool, - to_be_downloaded_id: BatchId(1), - to_be_processed_id: BatchId(1), + to_be_downloaded: start_epoch, + processing_target: start_epoch, state: ChainSyncingState::Stopped, current_processing_batch: None, beacon_processor_send, @@ -139,13 +140,10 @@ impl SyncingChain { /// Returns the latest slot number that has been processed. fn current_processed_slot(&self) -> Slot { - self.start_epoch + // the last slot we processed was included in the previous batch, and corresponds to the + // first slot of the current target epoch + self.processing_target .start_slot(T::EthSpec::slots_per_epoch()) - .saturating_add( - self.to_be_processed_id.saturating_sub(1u64) - * T::EthSpec::slots_per_epoch() - * EPOCHS_PER_BATCH, - ) } /// A batch of blocks has been received. This function gets run on all chains and should @@ -182,21 +180,19 @@ impl SyncingChain { // An entire batch of blocks has been received. This functions checks to see if it can be processed, // remove any batches waiting to be verified and if this chain is syncing, request new // blocks for the peer. - debug!(self.log, "Completed batch received"; "id"=> *batch.id, "blocks" => &batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len()); + debug!(self.log, "Completed batch received"; "epoch" => batch.start_epoch, "blocks" => &batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len()); // verify the range of received blocks // Note that the order of blocks is verified in block processing if let Some(last_slot) = batch.downloaded_blocks.last().map(|b| b.slot()) { // the batch is non-empty let first_slot = batch.downloaded_blocks[0].slot(); - if batch.start_slot > first_slot || batch.end_slot < last_slot { + if batch.start_slot() > first_slot || batch.end_slot() < last_slot { warn!(self.log, "BlocksByRange response returned out of range blocks"; - "response_initial_slot" => first_slot, - "requested_initial_slot" => batch.start_slot); - // This is a pretty bad error. We don't consider this fatal, but we don't tolerate - // this much either. - network.report_peer(batch.current_peer, PeerAction::LowToleranceError); - self.to_be_processed_id = batch.id; // reset the id back to here, when incrementing, it will check against completed batches + "response_initial_slot" => first_slot, + "requested_initial_slot" => batch.start_slot()); + // this batch can't be used, so we need to request it again. + self.failed_batch(network, batch); return; } } @@ -242,7 +238,7 @@ impl SyncingChain { // Check if there is a batch ready to be processed if !self.completed_batches.is_empty() - && self.completed_batches[0].id == self.to_be_processed_id + && self.completed_batches[0].start_epoch == self.processing_target { let batch = self.completed_batches.remove(0); @@ -258,7 +254,7 @@ impl SyncingChain { /// Sends a batch to the beacon processor for async processing in a queue. fn process_batch(&mut self, mut batch: Batch) { let blocks = std::mem::replace(&mut batch.downloaded_blocks, Vec::new()); - let process_id = ProcessId::RangeBatchId(self.id, batch.id); + let process_id = ProcessId::RangeBatchId(self.id, batch.start_epoch); self.current_processing_batch = Some(batch); if let Err(e) = self @@ -280,7 +276,7 @@ impl SyncingChain { &mut self, network: &mut SyncNetworkContext, chain_id: ChainId, - batch_id: BatchId, + batch_start_epoch: Epoch, downloaded_blocks: &mut Option>>, result: &BatchProcessResult, ) -> Option { @@ -289,14 +285,14 @@ impl SyncingChain { return None; } match &self.current_processing_batch { - Some(current_batch) if current_batch.id != batch_id => { + Some(current_batch) if current_batch.start_epoch != batch_start_epoch => { debug!(self.log, "Unexpected batch result"; - "chain_id" => self.id, "batch_id" => *batch_id, "expected_batch_id" => *current_batch.id); + "batch_epoch" => batch_start_epoch, "expected_batch_epoch" => current_batch.start_epoch); return None; } None => { debug!(self.log, "Chain was not expecting a batch result"; - "chain_id" => self.id, "batch_id" => *batch_id); + "batch_epoch" => batch_start_epoch); return None; } _ => { @@ -308,7 +304,7 @@ impl SyncingChain { let downloaded_blocks = downloaded_blocks.take().or_else(|| { // if taken by another chain, we are no longer waiting on a result. self.current_processing_batch = None; - crit!(self.log, "Processed batch taken by another chain"; "chain_id" => self.id); + crit!(self.log, "Processed batch taken by another chain"); None })?; @@ -318,16 +314,15 @@ impl SyncingChain { batch.downloaded_blocks = downloaded_blocks; // double check batches are processed in order TODO: Remove for prod - if batch.id != self.to_be_processed_id { + if batch.start_epoch != self.processing_target { crit!(self.log, "Batch processed out of order"; - "chain_id" => self.id, - "processed_batch_id" => *batch.id, - "expected_id" => *self.to_be_processed_id); + "processed_starting_epoch" => batch.start_epoch, + "expected_epoch" => self.processing_target); } let res = match result { BatchProcessResult::Success => { - *self.to_be_processed_id += 1; + self.processing_target += EPOCHS_PER_BATCH; // If the processed batch was not empty, we can validate previous invalidated // blocks including the current batch. @@ -357,7 +352,7 @@ impl SyncingChain { } BatchProcessResult::Partial => { warn!(self.log, "Batch processing failed but at least one block was imported"; - "chain_id" => self.id, "id" => *batch.id, "peer" => format!("{}", batch.current_peer) + "batch_epoch" => batch.start_epoch, "peer" => batch.current_peer.to_string() ); // At least one block was successfully verified and imported, so we can be sure all // previous batches are valid and we only need to download the current failed @@ -375,7 +370,7 @@ impl SyncingChain { let action = PeerAction::LowToleranceError; warn!(self.log, "Batch failed to download. Dropping chain scoring peers"; "score_adjustment" => action.to_string(), - "chain_id" => self.id, "id"=> *batch.id); + "batch_epoch"=> batch.start_epoch); for peer_id in self.peer_pool.drain() { network.report_peer(peer_id, action); } @@ -388,7 +383,7 @@ impl SyncingChain { } BatchProcessResult::Failed => { debug!(self.log, "Batch processing failed"; - "chain_id" => self.id,"id" => *batch.id, "peer" => batch.current_peer.to_string(), "client" => network.client_type(&batch.current_peer).to_string()); + "batch_epoch" => batch.start_epoch, "peer" => batch.current_peer.to_string(), "client" => network.client_type(&batch.current_peer).to_string()); // The batch processing failed // This could be because this batch is invalid, or a previous invalidated batch // is invalid. We need to find out which and downvote the peer that has sent us @@ -403,7 +398,7 @@ impl SyncingChain { let action = PeerAction::LowToleranceError; warn!(self.log, "Batch failed to download. Dropping chain scoring peers"; "score_adjustment" => action.to_string(), - "chain_id" => self.id, "id"=> *batch.id); + "batch_epoch" => batch.start_epoch); for peer_id in self.peer_pool.drain() { network.report_peer(peer_id, action); } @@ -433,11 +428,10 @@ impl SyncingChain { ) { while !self.processed_batches.is_empty() { let mut processed_batch = self.processed_batches.remove(0); - if *processed_batch.id >= *last_batch.id { + if processed_batch.start_epoch >= last_batch.start_epoch { crit!(self.log, "A processed batch had a greater id than the current process id"; - "chain_id" => self.id, - "processed_id" => *processed_batch.id, - "current_id" => *last_batch.id); + "processed_start_epoch" => processed_batch.start_epoch, + "current_start_epoch" => last_batch.start_epoch); } // Go through passed attempts and downscore peers that returned invalid batches @@ -452,11 +446,10 @@ impl SyncingChain { let action = PeerAction::LowToleranceError; debug!( self.log, "Re-processed batch validated. Scoring original peer"; - "chain_id" => self.id, - "batch_id" => *processed_batch.id, - "score_adjustment" => action.to_string(), - "original_peer" => format!("{}",attempt.peer_id), - "new_peer" => format!("{}", processed_batch.current_peer) + "batch_epoch" => processed_batch.start_epoch, + "score_adjustment" => action.to_string(), + "original_peer" => format!("{}",attempt.peer_id), + "new_peer" => format!("{}", processed_batch.current_peer) ); network.report_peer(attempt.peer_id, action); } else { @@ -465,11 +458,10 @@ impl SyncingChain { let action = PeerAction::MidToleranceError; debug!( self.log, "Re-processed batch validated by the same peer."; - "chain_id" => self.id, - "batch_id" => *processed_batch.id, - "score_adjustment" => action.to_string(), - "original_peer" => format!("{}",attempt.peer_id), - "new_peer" => format!("{}", processed_batch.current_peer) + "batch_epoch" => processed_batch.start_epoch, + "score_adjustment" => action.to_string(), + "original_peer" => format!("{}",attempt.peer_id), + "new_peer" => format!("{}", processed_batch.current_peer) ); network.report_peer(attempt.peer_id, action); } @@ -508,7 +500,7 @@ impl SyncingChain { // Find any pre-processed batches awaiting validation while !self.processed_batches.is_empty() { let past_batch = self.processed_batches.remove(0); - *self.to_be_processed_id = std::cmp::min(*self.to_be_processed_id, *past_batch.id); + self.processing_target = std::cmp::min(self.processing_target, past_batch.start_epoch); self.reprocess_batch(network, past_batch); } @@ -552,11 +544,10 @@ impl SyncingChain { batch.current_peer = new_peer.clone(); debug!(self.log, "Re-requesting batch"; - "chain_id" => self.id, - "start_slot" => batch.start_slot, + "start_slot" => batch.start_slot(), "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks - "id" => *batch.id, - "peer" => format!("{}", batch.current_peer), + "batch_epoch" => batch.start_epoch, + "peer" => batch.current_peer.to_string(), "retries" => batch.retries, "re-processes" => batch.reprocess_retries); self.send_batch(network, batch); @@ -592,12 +583,11 @@ impl SyncingChain { self.start_epoch = local_finalized_epoch; debug!(self.log, "Updating chain's progress"; - "chain_id" => self.id, "prev_completed_slot" => current_processed_slot, "new_completed_slot" => self.current_processed_slot()); // Re-index batches - *self.to_be_downloaded_id = 1; - *self.to_be_processed_id = 1; + self.to_be_downloaded = local_finalized_epoch; + self.processing_target = local_finalized_epoch; // remove any completed or processed batches self.completed_batches.clear(); @@ -621,7 +611,7 @@ impl SyncingChain { // do not request blocks if the chain is not syncing if let ChainSyncingState::Stopped = self.state { debug!(self.log, "Peer added to a non-syncing chain"; - "chain_id" => self.id, "peer_id" => format!("{}", peer_id)); + "peer_id" => format!("{}", peer_id)); return; } @@ -650,8 +640,7 @@ impl SyncingChain { ) -> Option { if let Some(batch) = self.pending_batches.remove(request_id) { debug!(self.log, "Batch failed. RPC Error"; - "chain_id" => self.id, - "id" => *batch.id, + "batch_epoch" => batch.start_epoch, "retries" => batch.retries, "peer" => format!("{:?}", peer_id)); @@ -688,12 +677,10 @@ impl SyncingChain { batch.current_peer = new_peer.clone(); debug!(self.log, "Re-Requesting batch"; - "chain_id" => self.id, - "start_slot" => batch.start_slot, + "start_slot" => batch.start_slot(), "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks - - "id" => *batch.id, - "peer" => format!("{:?}", batch.current_peer)); + "batch_epoch" => batch.start_epoch, + "peer" => batch.current_peer.to_string()); self.send_batch(network, batch); ProcessingResult::KeepChain } @@ -714,10 +701,9 @@ impl SyncingChain { if let Some(peer_id) = self.get_next_peer() { if let Some(batch) = self.get_next_batch(peer_id) { debug!(self.log, "Requesting batch"; - "chain_id" => self.id, - "start_slot" => batch.start_slot, - "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks - "id" => *batch.id, + "start_slot" => batch.start_slot(), + "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks + "batch_epoch" => batch.start_epoch, "peer" => format!("{}", batch.current_peer)); // send the batch self.send_batch(network, batch); @@ -770,22 +756,15 @@ impl SyncingChain { return None; } - // One is added to the start slot to begin one slot after the epoch boundary - let batch_start_slot = self - .start_epoch - .start_slot(slots_per_epoch) - .saturating_add(1u64) - + self.to_be_downloaded_id.saturating_sub(1) * blocks_per_batch; - // don't request batches beyond the target head slot - if batch_start_slot > self.target_head_slot { + if self.to_be_downloaded.start_slot(slots_per_epoch) > self.target_head_slot { return None; } // truncate the batch to the epoch containing the target head of the chain let batch_end_slot = std::cmp::min( // request either a batch containing the max number of blocks per batch - batch_start_slot + blocks_per_batch, + self.to_be_downloaded.start_slot(slots_per_epoch) + blocks_per_batch + 1, // or a batch of one epoch of blocks, which contains the `target_head_slot` self.target_head_slot .saturating_add(slots_per_epoch) @@ -793,28 +772,9 @@ impl SyncingChain { .start_slot(slots_per_epoch), ); - let batch_id = self.to_be_downloaded_id; - - // Find the next batch id. The largest of the next sequential id, or the next uncompleted - // id - let max_completed_id = self - .completed_batches - .iter() - .last() - .map(|x| x.id.0) - .unwrap_or_else(|| 0); - // TODO: Check if this is necessary - self.to_be_downloaded_id = BatchId(std::cmp::max( - self.to_be_downloaded_id.0 + 1, - max_completed_id + 1, - )); - - Some(Batch::new( - batch_id, - batch_start_slot, - batch_end_slot, - peer_id, - )) + let batch = Some(Batch::new(self.to_be_downloaded, batch_end_slot, peer_id)); + self.to_be_downloaded += EPOCHS_PER_BATCH; + batch } /// Requests the provided batch from the provided peer. @@ -832,14 +792,13 @@ impl SyncingChain { } Err(e) => { warn!(self.log, "Batch request failed"; - "chain_id" => self.id, - "start_slot" => batch.start_slot, - "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks - "id" => *batch.id, - "peer" => format!("{}", batch.current_peer), - "retries" => batch.retries, - "error" => e, - "re-processes" => batch.reprocess_retries); + "start_slot" => batch.start_slot(), + "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks + "start_epoch" => batch.start_epoch, + "peer" => batch.current_peer.to_string(), + "retries" => batch.retries, + "error" => e, + "re-processes" => batch.reprocess_retries); self.failed_batch(network, batch); } } diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 2eda6fc81..ff194d27b 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -9,7 +9,7 @@ use crate::sync::network_context::SyncNetworkContext; use crate::sync::PeerSyncInfo; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{types::SyncState, NetworkGlobals, PeerId}; -use slog::{debug, error, info}; +use slog::{debug, error, info, o}; use std::sync::Arc; use tokio::sync::mpsc; use types::EthSpec; @@ -313,7 +313,7 @@ impl ChainCollection { peer_id, beacon_processor_send, self.beacon_chain.clone(), - self.log.clone(), + self.log.new(o!("chain" => chain_id)), )); } diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index b87829559..85db6d378 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -8,6 +8,5 @@ mod range; mod sync_type; pub use batch::Batch; -pub use batch::BatchId; pub use chain::{ChainId, EPOCHS_PER_BATCH}; pub use range::RangeSync; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index d0a76cf6f..4d768b6fc 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -42,7 +42,6 @@ use super::chain::{ChainId, ProcessingResult}; use super::chain_collection::{ChainCollection, RangeSyncState}; use super::sync_type::RangeSyncType; -use super::BatchId; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::network_context::SyncNetworkContext; use crate::sync::BatchProcessResult; @@ -54,7 +53,7 @@ use slog::{debug, error, trace}; use std::collections::HashSet; use std::sync::Arc; use tokio::sync::mpsc; -use types::{EthSpec, SignedBeaconBlock}; +use types::{Epoch, EthSpec, SignedBeaconBlock}; /// The primary object dealing with long range/batch syncing. This contains all the active and /// non-active chains that need to be processed before the syncing is considered complete. This @@ -161,7 +160,7 @@ impl RangeSync { .chains .get_finalized_mut(remote_info.finalized_root, remote_finalized_slot) { - debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => format!("{:?}", peer_id), "target_root" => format!("{}", chain.target_head_root), "end_slot" => chain.target_head_slot, "start_epoch"=> chain.start_epoch); + debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => peer_id.to_string(), "target_root" => chain.target_head_root.to_string(), "targe_slot" => chain.target_head_slot); // add the peer to the chain's peer pool chain.add_peer(network, peer_id); @@ -271,7 +270,7 @@ impl RangeSync { &mut self, network: &mut SyncNetworkContext, chain_id: ChainId, - batch_id: BatchId, + epoch: Epoch, downloaded_blocks: Vec>, result: BatchProcessResult, ) { @@ -279,13 +278,7 @@ impl RangeSync { let mut downloaded_blocks = Some(downloaded_blocks); match self.chains.finalized_request(|chain| { - chain.on_batch_process_result( - network, - chain_id, - batch_id, - &mut downloaded_blocks, - &result, - ) + chain.on_batch_process_result(network, chain_id, epoch, &mut downloaded_blocks, &result) }) { Some((index, ProcessingResult::RemoveChain)) => { let chain = self.chains.remove_finalized_chain(index); @@ -319,7 +312,7 @@ impl RangeSync { chain.on_batch_process_result( network, chain_id, - batch_id, + epoch, &mut downloaded_blocks, &result, ) @@ -339,7 +332,7 @@ impl RangeSync { None => { // This can happen if a chain gets purged due to being out of date whilst a // batch process is in progress. - debug!(self.log, "No chains match the block processing id"; "id" => *batch_id); + debug!(self.log, "No chains match the block processing id"; "batch_epoch" => epoch, "chain_id" => chain_id); } } }