Pause sync when EE is offline (#3428)

## Issue Addressed

#3032

## Proposed Changes

Pause sync when ee is offline. Changes include three main parts:
- Online/offline notification system
- Pause sync
- Resume sync

#### Online/offline notification system
- The engine state is now guarded behind a new struct `State` that ensures every change is correctly notified. Notifications are only sent if the state changes. The new `State` is behind a `RwLock` (as before) as the synchronization mechanism.
- The actual notification channel is a [tokio::sync::watch](https://docs.rs/tokio/latest/tokio/sync/watch/index.html) which ensures only the last value is in the receiver channel. This way we don't need to worry about message order etc.
- Sync waits for state changes concurrently with normal messages.

#### Pause Sync
Sync has four components, pausing is done differently in each:
- **Block lookups**: Disabled while in this state. We drop current requests and don't search for new blocks. Block lookups are infrequent and I don't think it's worth the extra logic of keeping these and delaying processing. If we later see that this is required, we can add it.
- **Parent lookups**: Disabled while in this state. We drop current requests and don't search for new parents. Parent lookups are even less frequent and I don't think it's worth the extra logic of keeping these and delaying processing. If we later see that this is required, we can add it.
- **Range**: Chains don't send batches for processing to the beacon processor. This is easily done by guarding the channel to the beacon processor and giving it access only if the ee is responsive. I find this the simplest and most powerful approach since we don't need to deal with new sync states and chain segments that are added while the ee is offline will follow the same logic without needing to synchronize a shared state among those. Another advantage of passive pause vs active pause is that we can still keep track of active advertised chain segments so that on resume we don't need to re-evaluate all our peers.
- **Backfill**: Not affected by ee states, we don't pause.

#### Resume Sync
- **Block lookups**: Enabled again.
- **Parent lookups**: Enabled again.
- **Range**: Active resume. Since the only real pause range does is not sending batches for processing, resume makes all chains that are holding read-for-processing batches send them.
- **Backfill**: Not affected by ee states, no need to resume.

## Additional Info

**QUESTION**: Originally I made this to notify and change on synced state, but @pawanjay176 on talks with @paulhauner concluded we only need to check online/offline states. The upcheck function mentions extra checks to have a very up to date sync status to aid the networking stack. However, the only need the networking stack would have is this one. I added a TODO to review if the extra check can be removed

Next gen of #3094

Will work best with #3439 

Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com>
This commit is contained in:
Divma 2022-08-24 23:34:56 +00:00
parent aab4a8d2f2
commit 8c69d57c2c
14 changed files with 574 additions and 328 deletions

1
Cargo.lock generated
View File

@ -2087,6 +2087,7 @@ dependencies = [
"task_executor", "task_executor",
"tempfile", "tempfile",
"tokio", "tokio",
"tokio-stream",
"tree_hash", "tree_hash",
"tree_hash_derive", "tree_hash_derive",
"types", "types",

View File

@ -59,6 +59,7 @@ pub use block_verification::{BlockError, ExecutionPayloadError, GossipVerifiedBl
pub use canonical_head::{CachedHead, CanonicalHead, CanonicalHeadRwLock}; pub use canonical_head::{CachedHead, CanonicalHead, CanonicalHeadRwLock};
pub use eth1_chain::{Eth1Chain, Eth1ChainBackend}; pub use eth1_chain::{Eth1Chain, Eth1ChainBackend};
pub use events::ServerSentEventHandler; pub use events::ServerSentEventHandler;
pub use execution_layer::EngineState;
pub use fork_choice::{ExecutionStatus, ForkchoiceUpdateParameters}; pub use fork_choice::{ExecutionStatus, ForkchoiceUpdateParameters};
pub use metrics::scrape_for_metrics; pub use metrics::scrape_for_metrics;
pub use parking_lot; pub use parking_lot;

View File

@ -43,4 +43,5 @@ fork_choice = { path = "../../consensus/fork_choice" }
mev-build-rs = {git = "https://github.com/ralexstokes/mev-rs", tag = "v0.2.0"} mev-build-rs = {git = "https://github.com/ralexstokes/mev-rs", tag = "v0.2.0"}
ethereum-consensus = {git = "https://github.com/ralexstokes/ethereum-consensus"} ethereum-consensus = {git = "https://github.com/ralexstokes/ethereum-consensus"}
ssz-rs = {git = "https://github.com/ralexstokes/ssz-rs"} ssz-rs = {git = "https://github.com/ralexstokes/ssz-rs"}
tokio-stream = { version = "0.1.9", features = [ "sync" ] }
strum = "0.24.0" strum = "0.24.0"

View File

@ -9,7 +9,8 @@ use slog::{debug, error, info, Logger};
use std::future::Future; use std::future::Future;
use std::sync::Arc; use std::sync::Arc;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::sync::{Mutex, RwLock}; use tokio::sync::{watch, Mutex, RwLock};
use tokio_stream::wrappers::WatchStream;
use types::{Address, ExecutionBlockHash, Hash256}; use types::{Address, ExecutionBlockHash, Hash256};
/// The number of payload IDs that will be stored for each `Engine`. /// The number of payload IDs that will be stored for each `Engine`.
@ -18,14 +19,74 @@ use types::{Address, ExecutionBlockHash, Hash256};
const PAYLOAD_ID_LRU_CACHE_SIZE: usize = 512; const PAYLOAD_ID_LRU_CACHE_SIZE: usize = 512;
/// Stores the remembered state of a engine. /// Stores the remembered state of a engine.
#[derive(Copy, Clone, PartialEq, Debug)] #[derive(Copy, Clone, PartialEq, Debug, Eq, Default)]
enum EngineState { enum EngineStateInternal {
Synced, Synced,
#[default]
Offline, Offline,
Syncing, Syncing,
AuthFailed, AuthFailed,
} }
/// A subset of the engine state to inform other services if the engine is online or offline.
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum EngineState {
Online,
Offline,
}
impl From<EngineStateInternal> for EngineState {
fn from(state: EngineStateInternal) -> Self {
match state {
EngineStateInternal::Synced | EngineStateInternal::Syncing => EngineState::Online,
EngineStateInternal::Offline | EngineStateInternal::AuthFailed => EngineState::Offline,
}
}
}
/// Wrapper structure that ensures changes to the engine state are correctly reported to watchers.
struct State {
/// The actual engine state.
state: EngineStateInternal,
/// Notifier to watch the engine state.
notifier: watch::Sender<EngineState>,
}
impl std::ops::Deref for State {
type Target = EngineStateInternal;
fn deref(&self) -> &Self::Target {
&self.state
}
}
impl Default for State {
fn default() -> Self {
let state = EngineStateInternal::default();
let (notifier, _receiver) = watch::channel(state.into());
State { state, notifier }
}
}
impl State {
// Updates the state and notifies all watchers if the state has changed.
pub fn update(&mut self, new_state: EngineStateInternal) {
self.state = new_state;
self.notifier.send_if_modified(|last_state| {
let changed = *last_state != new_state.into(); // notify conditionally
*last_state = new_state.into(); // update the state unconditionally
changed
});
}
/// Gives access to a channel containing whether the last state is online.
///
/// This can be called several times.
pub fn watch(&self) -> WatchStream<EngineState> {
self.notifier.subscribe().into()
}
}
#[derive(Copy, Clone, PartialEq, Debug)] #[derive(Copy, Clone, PartialEq, Debug)]
pub struct ForkChoiceState { pub struct ForkChoiceState {
pub head_block_hash: ExecutionBlockHash, pub head_block_hash: ExecutionBlockHash,
@ -53,10 +114,10 @@ pub enum EngineError {
pub struct Engine { pub struct Engine {
pub api: HttpJsonRpc, pub api: HttpJsonRpc,
payload_id_cache: Mutex<LruCache<PayloadIdCacheKey, PayloadId>>, payload_id_cache: Mutex<LruCache<PayloadIdCacheKey, PayloadId>>,
state: RwLock<EngineState>, state: RwLock<State>,
pub latest_forkchoice_state: RwLock<Option<ForkChoiceState>>, latest_forkchoice_state: RwLock<Option<ForkChoiceState>>,
pub executor: TaskExecutor, executor: TaskExecutor,
pub log: Logger, log: Logger,
} }
impl Engine { impl Engine {
@ -65,13 +126,20 @@ impl Engine {
Self { Self {
api, api,
payload_id_cache: Mutex::new(LruCache::new(PAYLOAD_ID_LRU_CACHE_SIZE)), payload_id_cache: Mutex::new(LruCache::new(PAYLOAD_ID_LRU_CACHE_SIZE)),
state: RwLock::new(EngineState::Offline), state: Default::default(),
latest_forkchoice_state: Default::default(), latest_forkchoice_state: Default::default(),
executor, executor,
log: log.clone(), log: log.clone(),
} }
} }
/// Gives access to a channel containing the last engine state.
///
/// This can be called several times.
pub async fn watch_state(&self) -> WatchStream<EngineState> {
self.state.read().await.watch()
}
pub async fn get_payload_id( pub async fn get_payload_id(
&self, &self,
head_block_hash: ExecutionBlockHash, head_block_hash: ExecutionBlockHash,
@ -165,17 +233,16 @@ impl Engine {
/// Returns `true` if the engine has a "synced" status. /// Returns `true` if the engine has a "synced" status.
pub async fn is_synced(&self) -> bool { pub async fn is_synced(&self) -> bool {
*self.state.read().await == EngineState::Synced **self.state.read().await == EngineStateInternal::Synced
} }
/// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This /// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This
/// might be used to recover the node if offline. /// might be used to recover the node if offline.
pub async fn upcheck(&self) { pub async fn upcheck(&self) {
let state: EngineState = match self.api.upcheck().await { let state: EngineStateInternal = match self.api.upcheck().await {
Ok(()) => { Ok(()) => {
let mut state = self.state.write().await; let mut state = self.state.write().await;
if **state != EngineStateInternal::Synced {
if *state != EngineState::Synced {
info!( info!(
self.log, self.log,
"Execution engine online"; "Execution engine online";
@ -189,14 +256,13 @@ impl Engine {
"Execution engine online"; "Execution engine online";
); );
} }
state.update(EngineStateInternal::Synced);
*state = EngineState::Synced; **state
*state
} }
Err(EngineApiError::IsSyncing) => { Err(EngineApiError::IsSyncing) => {
let mut state = self.state.write().await; let mut state = self.state.write().await;
*state = EngineState::Syncing; state.update(EngineStateInternal::Syncing);
*state **state
} }
Err(EngineApiError::Auth(err)) => { Err(EngineApiError::Auth(err)) => {
error!( error!(
@ -206,8 +272,8 @@ impl Engine {
); );
let mut state = self.state.write().await; let mut state = self.state.write().await;
*state = EngineState::AuthFailed; state.update(EngineStateInternal::AuthFailed);
*state **state
} }
Err(e) => { Err(e) => {
error!( error!(
@ -217,8 +283,8 @@ impl Engine {
); );
let mut state = self.state.write().await; let mut state = self.state.write().await;
*state = EngineState::Offline; state.update(EngineStateInternal::Offline);
*state **state
} }
}; };
@ -244,12 +310,10 @@ impl Engine {
Ok(result) => { Ok(result) => {
// Take a clone *without* holding the read-lock since the `upcheck` function will // Take a clone *without* holding the read-lock since the `upcheck` function will
// take a write-lock. // take a write-lock.
let state: EngineState = *self.state.read().await; let state: EngineStateInternal = **self.state.read().await;
// If this request just returned successfully but we don't think this node is // Keep an up to date engine state.
// synced, check to see if it just became synced. This helps to ensure that the if state != EngineStateInternal::Synced {
// networking stack can get fast feedback about a synced engine.
if state != EngineState::Synced {
// Spawn the upcheck in another task to avoid slowing down this request. // Spawn the upcheck in another task to avoid slowing down this request.
let inner_self = self.clone(); let inner_self = self.clone();
self.executor.spawn( self.executor.spawn(
@ -293,3 +357,22 @@ impl PayloadIdCacheKey {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use tokio_stream::StreamExt;
#[tokio::test]
async fn test_state_notifier() {
let mut state = State::default();
let initial_state: EngineState = state.state.into();
assert_eq!(initial_state, EngineState::Offline);
state.update(EngineStateInternal::Synced);
// a watcher that arrives after the first update.
let mut watcher = state.watch();
let new_state = watcher.next().await.expect("Last state is always present");
assert_eq!(new_state, EngineState::Online);
}
}

View File

@ -10,8 +10,8 @@ use builder_client::BuilderHttpClient;
use engine_api::Error as ApiError; use engine_api::Error as ApiError;
pub use engine_api::*; pub use engine_api::*;
pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc}; pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc};
pub use engines::ForkChoiceState;
use engines::{Engine, EngineError}; use engines::{Engine, EngineError};
pub use engines::{EngineState, ForkChoiceState};
use fork_choice::ForkchoiceUpdateParameters; use fork_choice::ForkchoiceUpdateParameters;
use lru::LruCache; use lru::LruCache;
use payload_status::process_payload_status; use payload_status::process_payload_status;
@ -31,6 +31,7 @@ use tokio::{
sync::{Mutex, MutexGuard, RwLock}, sync::{Mutex, MutexGuard, RwLock},
time::sleep, time::sleep,
}; };
use tokio_stream::wrappers::WatchStream;
use types::{ use types::{
BlindedPayload, BlockType, ChainSpec, Epoch, ExecPayload, ExecutionBlockHash, ForkName, BlindedPayload, BlockType, ChainSpec, Epoch, ExecPayload, ExecutionBlockHash, ForkName,
ProposerPreparationData, PublicKeyBytes, SignedBeaconBlock, Slot, ProposerPreparationData, PublicKeyBytes, SignedBeaconBlock, Slot,
@ -286,6 +287,13 @@ impl<T: EthSpec> ExecutionLayer<T> {
self.inner.execution_blocks.lock().await self.inner.execution_blocks.lock().await
} }
/// Gives access to a channel containing if the last engine state is online or not.
///
/// This can be called several times.
pub async fn get_responsiveness_watch(&self) -> WatchStream<EngineState> {
self.engine().watch_state().await
}
/// Note: this function returns a mutex guard, be careful to avoid deadlocks. /// Note: this function returns a mutex guard, be careful to avoid deadlocks.
async fn proposer_preparation_data( async fn proposer_preparation_data(
&self, &self,

View File

@ -24,7 +24,6 @@ use std::collections::{
HashMap, HashSet, HashMap, HashSet,
}; };
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc;
use types::{Epoch, EthSpec, SignedBeaconBlock}; use types::{Epoch, EthSpec, SignedBeaconBlock};
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
@ -144,9 +143,6 @@ pub struct BackFillSync<T: BeaconChainTypes> {
/// (i.e synced peers). /// (i.e synced peers).
network_globals: Arc<NetworkGlobals<T::EthSpec>>, network_globals: Arc<NetworkGlobals<T::EthSpec>>,
/// A multi-threaded, non-blocking processor for processing batches in the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
/// A logger for backfill sync. /// A logger for backfill sync.
log: slog::Logger, log: slog::Logger,
} }
@ -155,7 +151,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
pub fn new( pub fn new(
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>, network_globals: Arc<NetworkGlobals<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
log: slog::Logger, log: slog::Logger,
) -> Self { ) -> Self {
// Determine if backfill is enabled or not. // Determine if backfill is enabled or not.
@ -193,7 +188,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
participating_peers: HashSet::new(), participating_peers: HashSet::new(),
restart_failed_sync: false, restart_failed_sync: false,
beacon_chain, beacon_chain,
beacon_processor_send,
log, log,
}; };
@ -216,7 +210,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
pub fn start( pub fn start(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
) -> Result<SyncStart, BackFillError> { ) -> Result<SyncStart, BackFillError> {
match self.state() { match self.state() {
BackFillState::Syncing => {} // already syncing ignore. BackFillState::Syncing => {} // already syncing ignore.
@ -312,7 +306,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
pub fn peer_disconnected( pub fn peer_disconnected(
&mut self, &mut self,
peer_id: &PeerId, peer_id: &PeerId,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
) -> Result<(), BackFillError> { ) -> Result<(), BackFillError> {
if matches!( if matches!(
self.state(), self.state(),
@ -355,7 +349,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
pub fn inject_error( pub fn inject_error(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
peer_id: &PeerId, peer_id: &PeerId,
request_id: Id, request_id: Id,
@ -392,7 +386,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
pub fn on_block_response( pub fn on_block_response(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
peer_id: &PeerId, peer_id: &PeerId,
request_id: Id, request_id: Id,
@ -505,7 +499,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// The batch must exist and be ready for processing /// The batch must exist and be ready for processing
fn process_batch( fn process_batch(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
) -> Result<ProcessResult, BackFillError> { ) -> Result<ProcessResult, BackFillError> {
// Only process batches if this chain is Syncing, and only one at a time // Only process batches if this chain is Syncing, and only one at a time
@ -541,8 +535,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id); let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id);
self.current_processing_batch = Some(batch_id); self.current_processing_batch = Some(batch_id);
if let Err(e) = self if let Err(e) = network
.beacon_processor_send .processor_channel()
.try_send(BeaconWorkEvent::chain_segment(process_id, blocks)) .try_send(BeaconWorkEvent::chain_segment(process_id, blocks))
{ {
crit!(self.log, "Failed to send backfill segment to processor."; "msg" => "process_batch", crit!(self.log, "Failed to send backfill segment to processor."; "msg" => "process_batch",
@ -563,7 +557,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
pub fn on_batch_process_result( pub fn on_batch_process_result(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
result: &BatchProcessResult, result: &BatchProcessResult,
) -> Result<ProcessResult, BackFillError> { ) -> Result<ProcessResult, BackFillError> {
@ -704,7 +698,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// Processes the next ready batch. /// Processes the next ready batch.
fn process_completed_batches( fn process_completed_batches(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
) -> Result<ProcessResult, BackFillError> { ) -> Result<ProcessResult, BackFillError> {
// Only process batches if backfill is syncing and only process one batch at a time // Only process batches if backfill is syncing and only process one batch at a time
if self.state() != BackFillState::Syncing || self.current_processing_batch.is_some() { if self.state() != BackFillState::Syncing || self.current_processing_batch.is_some() {
@ -764,11 +758,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// ///
/// If a previous batch has been validated and it had been re-processed, penalize the original /// If a previous batch has been validated and it had been re-processed, penalize the original
/// peer. /// peer.
fn advance_chain( fn advance_chain(&mut self, network: &mut SyncNetworkContext<T>, validating_epoch: Epoch) {
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
validating_epoch: Epoch,
) {
// make sure this epoch produces an advancement // make sure this epoch produces an advancement
if validating_epoch >= self.current_start { if validating_epoch >= self.current_start {
return; return;
@ -863,7 +853,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// intended and can result in downvoting a peer. /// intended and can result in downvoting a peer.
fn handle_invalid_batch( fn handle_invalid_batch(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
) -> Result<(), BackFillError> { ) -> Result<(), BackFillError> {
// The current batch could not be processed, indicating either the current or previous // The current batch could not be processed, indicating either the current or previous
@ -914,7 +904,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// Sends and registers the request of a batch awaiting download. /// Sends and registers the request of a batch awaiting download.
fn retry_batch_download( fn retry_batch_download(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
) -> Result<(), BackFillError> { ) -> Result<(), BackFillError> {
let batch = match self.batches.get_mut(&batch_id) { let batch = match self.batches.get_mut(&batch_id) {
@ -958,7 +948,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// Requests the batch assigned to the given id from a given peer. /// Requests the batch assigned to the given id from a given peer.
fn send_batch( fn send_batch(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
peer: PeerId, peer: PeerId,
) -> Result<(), BackFillError> { ) -> Result<(), BackFillError> {
@ -1011,10 +1001,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// When resuming a chain, this function searches for batches that need to be re-downloaded and /// When resuming a chain, this function searches for batches that need to be re-downloaded and
/// transitions their state to redownload the batch. /// transitions their state to redownload the batch.
fn resume_batches( fn resume_batches(&mut self, network: &mut SyncNetworkContext<T>) -> Result<(), BackFillError> {
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
) -> Result<(), BackFillError> {
let batch_ids_to_retry = self let batch_ids_to_retry = self
.batches .batches
.iter() .iter()
@ -1040,7 +1027,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// pool and left over batches until the batch buffer is reached or all peers are exhausted. /// pool and left over batches until the batch buffer is reached or all peers are exhausted.
fn request_batches( fn request_batches(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
) -> Result<(), BackFillError> { ) -> Result<(), BackFillError> {
if !matches!(self.state(), BackFillState::Syncing) { if !matches!(self.state(), BackFillState::Syncing) {
return Ok(()); return Ok(());

View File

@ -5,11 +5,10 @@ use beacon_chain::{BeaconChainTypes, BlockError};
use fnv::FnvHashMap; use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId}; use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache; use lru_cache::LRUTimeCache;
use slog::{crit, debug, error, trace, warn, Logger}; use slog::{debug, error, trace, warn, Logger};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::sync::Arc; use std::sync::Arc;
use store::{Hash256, SignedBeaconBlock}; use store::{Hash256, SignedBeaconBlock};
use tokio::sync::mpsc;
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
use crate::metrics; use crate::metrics;
@ -36,7 +35,7 @@ const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
pub(crate) struct BlockLookups<T: BeaconChainTypes> { pub(crate) struct BlockLookups<T: BeaconChainTypes> {
/// A collection of parent block lookups. /// A collection of parent block lookups.
parent_queue: SmallVec<[ParentLookup<T::EthSpec>; 3]>, parent_queue: SmallVec<[ParentLookup<T>; 3]>,
/// A cache of failed chain lookups to prevent duplicate searches. /// A cache of failed chain lookups to prevent duplicate searches.
failed_chains: LRUTimeCache<Hash256>, failed_chains: LRUTimeCache<Hash256>,
@ -47,22 +46,18 @@ pub(crate) struct BlockLookups<T: BeaconChainTypes> {
/// The flag allows us to determine if the peer returned data or sent us nothing. /// The flag allows us to determine if the peer returned data or sent us nothing.
single_block_lookups: FnvHashMap<Id, SingleBlockRequest<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS>>, single_block_lookups: FnvHashMap<Id, SingleBlockRequest<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS>>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<WorkEvent<T>>,
/// The logger for the import manager. /// The logger for the import manager.
log: Logger, log: Logger,
} }
impl<T: BeaconChainTypes> BlockLookups<T> { impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn new(beacon_processor_send: mpsc::Sender<WorkEvent<T>>, log: Logger) -> Self { pub fn new(log: Logger) -> Self {
Self { Self {
parent_queue: Default::default(), parent_queue: Default::default(),
failed_chains: LRUTimeCache::new(Duration::from_secs( failed_chains: LRUTimeCache::new(Duration::from_secs(
FAILED_CHAINS_CACHE_EXPIRY_SECONDS, FAILED_CHAINS_CACHE_EXPIRY_SECONDS,
)), )),
single_block_lookups: Default::default(), single_block_lookups: Default::default(),
beacon_processor_send,
log, log,
} }
} }
@ -71,12 +66,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
/// constructed. /// constructed.
pub fn search_block( pub fn search_block(&mut self, hash: Hash256, peer_id: PeerId, cx: &mut SyncNetworkContext<T>) {
&mut self,
hash: Hash256,
peer_id: PeerId,
cx: &mut SyncNetworkContext<T::EthSpec>,
) {
// Do not re-request a block that is already being requested // Do not re-request a block that is already being requested
if self if self
.single_block_lookups .single_block_lookups
@ -113,7 +103,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self, &mut self,
block: Arc<SignedBeaconBlock<T::EthSpec>>, block: Arc<SignedBeaconBlock<T::EthSpec>>,
peer_id: PeerId, peer_id: PeerId,
cx: &mut SyncNetworkContext<T::EthSpec>, cx: &mut SyncNetworkContext<T>,
) { ) {
let block_root = block.canonical_root(); let block_root = block.canonical_root();
let parent_root = block.parent_root(); let parent_root = block.parent_root();
@ -147,18 +137,16 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_id: PeerId, peer_id: PeerId,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>, block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
seen_timestamp: Duration, seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T::EthSpec>, cx: &mut SyncNetworkContext<T>,
) { ) {
let mut request = match self.single_block_lookups.entry(id) { let mut request = match self.single_block_lookups.entry(id) {
Entry::Occupied(req) => req, Entry::Occupied(req) => req,
Entry::Vacant(_) => { Entry::Vacant(_) => {
if block.is_some() { if block.is_some() {
crit!( debug!(
self.log, self.log,
"Block returned for single block lookup not present" "Block returned for single block lookup not present"
); );
#[cfg(debug_assertions)]
panic!("block returned for single block lookup not present");
} }
return; return;
} }
@ -172,6 +160,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
block, block,
seen_timestamp, seen_timestamp,
BlockProcessType::SingleBlock { id }, BlockProcessType::SingleBlock { id },
cx,
) )
.is_err() .is_err()
{ {
@ -212,7 +201,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_id: PeerId, peer_id: PeerId,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>, block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
seen_timestamp: Duration, seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T::EthSpec>, cx: &mut SyncNetworkContext<T>,
) { ) {
let mut parent_lookup = if let Some(pos) = self let mut parent_lookup = if let Some(pos) = self
.parent_queue .parent_queue
@ -236,6 +225,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
block, block,
seen_timestamp, seen_timestamp,
BlockProcessType::ParentLookup { chain_hash }, BlockProcessType::ParentLookup { chain_hash },
cx,
) )
.is_ok() .is_ok()
{ {
@ -289,7 +279,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/* Error responses */ /* Error responses */
#[allow(clippy::needless_collect)] // false positive #[allow(clippy::needless_collect)] // false positive
pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext<T::EthSpec>) { pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext<T>) {
/* Check disconnection for single block lookups */ /* Check disconnection for single block lookups */
// better written after https://github.com/rust-lang/rust/issues/59618 // better written after https://github.com/rust-lang/rust/issues/59618
let remove_retry_ids: Vec<Id> = self let remove_retry_ids: Vec<Id> = self
@ -345,7 +335,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self, &mut self,
id: Id, id: Id,
peer_id: PeerId, peer_id: PeerId,
cx: &mut SyncNetworkContext<T::EthSpec>, cx: &mut SyncNetworkContext<T>,
) { ) {
if let Some(pos) = self if let Some(pos) = self
.parent_queue .parent_queue
@ -365,7 +355,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
); );
} }
pub fn single_block_lookup_failed(&mut self, id: Id, cx: &mut SyncNetworkContext<T::EthSpec>) { pub fn single_block_lookup_failed(&mut self, id: Id, cx: &mut SyncNetworkContext<T>) {
if let Some(mut request) = self.single_block_lookups.remove(&id) { if let Some(mut request) = self.single_block_lookups.remove(&id) {
request.register_failure_downloading(); request.register_failure_downloading();
trace!(self.log, "Single block lookup failed"; "block" => %request.hash); trace!(self.log, "Single block lookup failed"; "block" => %request.hash);
@ -388,15 +378,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self, &mut self,
id: Id, id: Id,
result: BlockProcessResult<T::EthSpec>, result: BlockProcessResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T::EthSpec>, cx: &mut SyncNetworkContext<T>,
) { ) {
let mut req = match self.single_block_lookups.remove(&id) { let mut req = match self.single_block_lookups.remove(&id) {
Some(req) => req, Some(req) => req,
None => { None => {
#[cfg(debug_assertions)] return debug!(
panic!("block processed for single block lookup not present");
#[cfg(not(debug_assertions))]
return crit!(
self.log, self.log,
"Block processed for single block lookup not present" "Block processed for single block lookup not present"
); );
@ -476,7 +463,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self, &mut self,
chain_hash: Hash256, chain_hash: Hash256,
result: BlockProcessResult<T::EthSpec>, result: BlockProcessResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T::EthSpec>, cx: &mut SyncNetworkContext<T>,
) { ) {
let (mut parent_lookup, peer_id) = if let Some((pos, peer)) = self let (mut parent_lookup, peer_id) = if let Some((pos, peer)) = self
.parent_queue .parent_queue
@ -489,13 +476,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}) { }) {
(self.parent_queue.remove(pos), peer) (self.parent_queue.remove(pos), peer)
} else { } else {
#[cfg(debug_assertions)] return debug!(self.log, "Process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash);
panic!(
"Process response for a parent lookup request that was not found. Chain_hash: {}",
chain_hash
);
#[cfg(not(debug_assertions))]
return crit!(self.log, "Process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash);
}; };
match &result { match &result {
@ -524,14 +505,22 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
} }
BlockProcessResult::Ok BlockProcessResult::Ok
| BlockProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => { | BlockProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => {
// Check if the beacon processor is available
let beacon_processor_send = match cx.processor_channel_if_enabled() {
Some(channel) => channel,
None => {
return trace!(
self.log,
"Dropping parent chain segment that was ready for processing.";
parent_lookup
);
}
};
let chain_hash = parent_lookup.chain_hash(); let chain_hash = parent_lookup.chain_hash();
let blocks = parent_lookup.chain_blocks(); let blocks = parent_lookup.chain_blocks();
let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); let process_id = ChainSegmentProcessId::ParentLookup(chain_hash);
match self match beacon_processor_send.try_send(WorkEvent::chain_segment(process_id, blocks)) {
.beacon_processor_send
.try_send(WorkEvent::chain_segment(process_id, blocks))
{
Ok(_) => { Ok(_) => {
self.parent_queue.push(parent_lookup); self.parent_queue.push(parent_lookup);
} }
@ -595,7 +584,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self, &mut self,
chain_hash: Hash256, chain_hash: Hash256,
result: BatchProcessResult, result: BatchProcessResult,
cx: &mut SyncNetworkContext<T::EthSpec>, cx: &mut SyncNetworkContext<T>,
) { ) {
let parent_lookup = if let Some(pos) = self let parent_lookup = if let Some(pos) = self
.parent_queue .parent_queue
@ -604,12 +593,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
{ {
self.parent_queue.remove(pos) self.parent_queue.remove(pos)
} else { } else {
#[cfg(debug_assertions)]
panic!(
"Chain process response for a parent lookup request that was not found. Chain_hash: {}",
chain_hash
);
#[cfg(not(debug_assertions))]
return debug!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); return debug!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash);
}; };
@ -645,25 +628,34 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
block: Arc<SignedBeaconBlock<T::EthSpec>>, block: Arc<SignedBeaconBlock<T::EthSpec>>,
duration: Duration, duration: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), ()> { ) -> Result<(), ()> {
trace!(self.log, "Sending block for processing"; "block" => %block.canonical_root(), "process" => ?process_type); match cx.processor_channel_if_enabled() {
let event = WorkEvent::rpc_beacon_block(block, duration, process_type); Some(beacon_processor_send) => {
if let Err(e) = self.beacon_processor_send.try_send(event) { trace!(self.log, "Sending block for processing"; "block" => %block.canonical_root(), "process" => ?process_type);
error!( let event = WorkEvent::rpc_beacon_block(block, duration, process_type);
self.log, if let Err(e) = beacon_processor_send.try_send(event) {
"Failed to send sync block to processor"; error!(
"error" => ?e self.log,
); "Failed to send sync block to processor";
return Err(()); "error" => ?e
);
Err(())
} else {
Ok(())
}
}
None => {
trace!(self.log, "Dropping block ready for processing. Beacon processor not available"; "block" => %block.canonical_root());
Err(())
}
} }
Ok(())
} }
fn request_parent( fn request_parent(
&mut self, &mut self,
mut parent_lookup: ParentLookup<T::EthSpec>, mut parent_lookup: ParentLookup<T>,
cx: &mut SyncNetworkContext<T::EthSpec>, cx: &mut SyncNetworkContext<T>,
) { ) {
match parent_lookup.request_parent(cx) { match parent_lookup.request_parent(cx) {
Err(e) => { Err(e) => {
@ -710,4 +702,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.parent_queue.len() as i64, self.parent_queue.len() as i64,
); );
} }
/// Drops all the single block requests and returns how many requests were dropped.
pub fn drop_single_block_requests(&mut self) -> usize {
self.single_block_lookups.drain().len()
}
/// Drops all the parent chain requests and returns how many requests were dropped.
pub fn drop_parent_chain_requests(&mut self) -> usize {
self.parent_queue.drain(..).len()
}
} }

View File

@ -1,6 +1,7 @@
use beacon_chain::BeaconChainTypes;
use lighthouse_network::PeerId; use lighthouse_network::PeerId;
use std::sync::Arc; use std::sync::Arc;
use store::{EthSpec, Hash256, SignedBeaconBlock}; use store::{Hash256, SignedBeaconBlock};
use strum::IntoStaticStr; use strum::IntoStaticStr;
use crate::sync::{ use crate::sync::{
@ -18,11 +19,11 @@ pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5;
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;
/// Maintains a sequential list of parents to lookup and the lookup's current state. /// Maintains a sequential list of parents to lookup and the lookup's current state.
pub(crate) struct ParentLookup<T: EthSpec> { pub(crate) struct ParentLookup<T: BeaconChainTypes> {
/// The root of the block triggering this parent request. /// The root of the block triggering this parent request.
chain_hash: Hash256, chain_hash: Hash256,
/// The blocks that have currently been downloaded. /// The blocks that have currently been downloaded.
downloaded_blocks: Vec<Arc<SignedBeaconBlock<T>>>, downloaded_blocks: Vec<Arc<SignedBeaconBlock<T::EthSpec>>>,
/// Request of the last parent. /// Request of the last parent.
current_parent_request: SingleBlockRequest<PARENT_FAIL_TOLERANCE>, current_parent_request: SingleBlockRequest<PARENT_FAIL_TOLERANCE>,
/// Id of the last parent request. /// Id of the last parent request.
@ -50,14 +51,14 @@ pub enum RequestError {
NoPeers, NoPeers,
} }
impl<T: EthSpec> ParentLookup<T> { impl<T: BeaconChainTypes> ParentLookup<T> {
pub fn contains_block(&self, block: &SignedBeaconBlock<T>) -> bool { pub fn contains_block(&self, block: &SignedBeaconBlock<T::EthSpec>) -> bool {
self.downloaded_blocks self.downloaded_blocks
.iter() .iter()
.any(|d_block| d_block.as_ref() == block) .any(|d_block| d_block.as_ref() == block)
} }
pub fn new(block: Arc<SignedBeaconBlock<T>>, peer_id: PeerId) -> Self { pub fn new(block: Arc<SignedBeaconBlock<T::EthSpec>>, peer_id: PeerId) -> Self {
let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id); let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id);
Self { Self {
@ -92,7 +93,7 @@ impl<T: EthSpec> ParentLookup<T> {
self.current_parent_request.check_peer_disconnected(peer_id) self.current_parent_request.check_peer_disconnected(peer_id)
} }
pub fn add_block(&mut self, block: Arc<SignedBeaconBlock<T>>) { pub fn add_block(&mut self, block: Arc<SignedBeaconBlock<T::EthSpec>>) {
let next_parent = block.parent_root(); let next_parent = block.parent_root();
self.downloaded_blocks.push(block); self.downloaded_blocks.push(block);
self.current_parent_request.hash = next_parent; self.current_parent_request.hash = next_parent;
@ -119,7 +120,7 @@ impl<T: EthSpec> ParentLookup<T> {
self.current_parent_request_id = None; self.current_parent_request_id = None;
} }
pub fn chain_blocks(&mut self) -> Vec<Arc<SignedBeaconBlock<T>>> { pub fn chain_blocks(&mut self) -> Vec<Arc<SignedBeaconBlock<T::EthSpec>>> {
std::mem::take(&mut self.downloaded_blocks) std::mem::take(&mut self.downloaded_blocks)
} }
@ -127,9 +128,9 @@ impl<T: EthSpec> ParentLookup<T> {
/// the processing result of the block. /// the processing result of the block.
pub fn verify_block( pub fn verify_block(
&mut self, &mut self,
block: Option<Arc<SignedBeaconBlock<T>>>, block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>, failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
) -> Result<Option<Arc<SignedBeaconBlock<T>>>, VerifyError> { ) -> Result<Option<Arc<SignedBeaconBlock<T::EthSpec>>>, VerifyError> {
let block = self.current_parent_request.verify_block(block)?; let block = self.current_parent_request.verify_block(block)?;
// check if the parent of this block isn't in the failed cache. If it is, this chain should // check if the parent of this block isn't in the failed cache. If it is, this chain should
@ -189,7 +190,7 @@ impl From<super::single_block_lookup::LookupRequestError> for RequestError {
} }
} }
impl<T: EthSpec> slog::KV for ParentLookup<T> { impl<T: BeaconChainTypes> slog::KV for ParentLookup<T> {
fn serialize( fn serialize(
&self, &self,
record: &slog::Record, record: &slog::Record,

View File

@ -12,6 +12,7 @@ use lighthouse_network::{NetworkGlobals, Request};
use slog::{Drain, Level}; use slog::{Drain, Level};
use slot_clock::SystemTimeSlotClock; use slot_clock::SystemTimeSlotClock;
use store::MemoryStore; use store::MemoryStore;
use tokio::sync::mpsc;
use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; use types::test_utils::{SeedableRng, TestRandom, XorShiftRng};
use types::MinimalEthSpec as E; use types::MinimalEthSpec as E;
@ -26,7 +27,7 @@ struct TestRig {
const D: Duration = Duration::new(0, 0); const D: Duration = Duration::new(0, 0);
impl TestRig { impl TestRig {
fn test_setup(log_level: Option<Level>) -> (BlockLookups<T>, SyncNetworkContext<E>, Self) { fn test_setup(log_level: Option<Level>) -> (BlockLookups<T>, SyncNetworkContext<T>, Self) {
let log = { let log = {
let decorator = slog_term::TermDecorator::new().build(); let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse(); let drain = slog_term::FullFormat::new(decorator).build().fuse();
@ -47,15 +48,13 @@ impl TestRig {
network_rx, network_rx,
rng, rng,
}; };
let bl = BlockLookups::new( let bl = BlockLookups::new(log.new(slog::o!("component" => "block_lookups")));
beacon_processor_tx,
log.new(slog::o!("component" => "block_lookups")),
);
let cx = { let cx = {
let globals = Arc::new(NetworkGlobals::new_test_globals(&log)); let globals = Arc::new(NetworkGlobals::new_test_globals(&log));
SyncNetworkContext::new( SyncNetworkContext::new(
network_tx, network_tx,
globals, globals,
beacon_processor_tx,
log.new(slog::o!("component" => "network_context")), log.new(slog::o!("component" => "network_context")),
) )
}; };

View File

@ -41,7 +41,8 @@ use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
use crate::service::NetworkMessage; use crate::service::NetworkMessage;
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
use futures::StreamExt;
use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS; use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::SyncInfo; use lighthouse_network::SyncInfo;
@ -165,7 +166,7 @@ pub struct SyncManager<T: BeaconChainTypes> {
input_channel: mpsc::UnboundedReceiver<SyncMessage<T::EthSpec>>, input_channel: mpsc::UnboundedReceiver<SyncMessage<T::EthSpec>>,
/// A network context to contact the network service. /// A network context to contact the network service.
network: SyncNetworkContext<T::EthSpec>, network: SyncNetworkContext<T>,
/// The object handling long-range batch load-balanced syncing. /// The object handling long-range batch load-balanced syncing.
range_sync: RangeSync<T>, range_sync: RangeSync<T>,
@ -202,19 +203,15 @@ pub fn spawn<T: BeaconChainTypes>(
chain: beacon_chain.clone(), chain: beacon_chain.clone(),
network_globals: network_globals.clone(), network_globals: network_globals.clone(),
input_channel: sync_recv, input_channel: sync_recv,
network: SyncNetworkContext::new(network_send, network_globals.clone(), log.clone()), network: SyncNetworkContext::new(
range_sync: RangeSync::new( network_send,
beacon_chain.clone(), network_globals.clone(),
beacon_processor_send.clone(), beacon_processor_send,
log.clone(), log.clone(),
), ),
backfill_sync: BackFillSync::new( range_sync: RangeSync::new(beacon_chain.clone(), log.clone()),
beacon_chain, backfill_sync: BackFillSync::new(beacon_chain, network_globals, log.clone()),
network_globals, block_lookups: BlockLookups::new(log.clone()),
beacon_processor_send.clone(),
log.clone(),
),
block_lookups: BlockLookups::new(beacon_processor_send, log.clone()),
log: log.clone(), log: log.clone(),
}; };
@ -468,100 +465,178 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// The main driving future for the sync manager. /// The main driving future for the sync manager.
async fn main(&mut self) { async fn main(&mut self) {
let check_ee = self.chain.execution_layer.is_some();
let mut check_ee_stream = {
// some magic to have an instance implementing stream even if there is no execution layer
let ee_responsiveness_watch: futures::future::OptionFuture<_> = self
.chain
.execution_layer
.as_ref()
.map(|el| el.get_responsiveness_watch())
.into();
futures::stream::iter(ee_responsiveness_watch.await).flatten()
};
// process any inbound messages // process any inbound messages
loop { loop {
if let Some(sync_message) = self.input_channel.recv().await { tokio::select! {
match sync_message { Some(sync_message) = self.input_channel.recv() => {
SyncMessage::AddPeer(peer_id, info) => { self.handle_message(sync_message);
self.add_peer(peer_id, info); },
} Some(engine_state) = check_ee_stream.next(), if check_ee => {
SyncMessage::RpcBlock { self.handle_new_execution_engine_state(engine_state);
request_id, }
peer_id, }
beacon_block, }
seen_timestamp, }
} => {
self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp);
}
SyncMessage::UnknownBlock(peer_id, block) => {
// If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore
if !self.network_globals.sync_state.read().is_synced() {
let head_slot = self.chain.canonical_head.cached_head().head_slot();
let unknown_block_slot = block.slot();
// if the block is far in the future, ignore it. If its within the slot tolerance of fn handle_message(&mut self, sync_message: SyncMessage<T::EthSpec>) {
// our current head, regardless of the syncing state, fetch it. match sync_message {
if (head_slot >= unknown_block_slot SyncMessage::AddPeer(peer_id, info) => {
&& head_slot.sub(unknown_block_slot).as_usize() self.add_peer(peer_id, info);
> SLOT_IMPORT_TOLERANCE) }
|| (head_slot < unknown_block_slot SyncMessage::RpcBlock {
&& unknown_block_slot.sub(head_slot).as_usize() request_id,
> SLOT_IMPORT_TOLERANCE) peer_id,
{ beacon_block,
continue; seen_timestamp,
} } => {
} self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp);
if self.network_globals.peers.read().is_connected(&peer_id) { }
self.block_lookups SyncMessage::UnknownBlock(peer_id, block) => {
.search_parent(block, peer_id, &mut self.network); // If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore
} if !self.network_globals.sync_state.read().is_synced() {
let head_slot = self.chain.canonical_head.cached_head().head_slot();
let unknown_block_slot = block.slot();
// if the block is far in the future, ignore it. If its within the slot tolerance of
// our current head, regardless of the syncing state, fetch it.
if (head_slot >= unknown_block_slot
&& head_slot.sub(unknown_block_slot).as_usize() > SLOT_IMPORT_TOLERANCE)
|| (head_slot < unknown_block_slot
&& unknown_block_slot.sub(head_slot).as_usize() > SLOT_IMPORT_TOLERANCE)
{
return;
} }
SyncMessage::UnknownBlockHash(peer_id, block_hash) => { }
// If we are not synced, ignore this block. if self.network_globals.peers.read().is_connected(&peer_id)
if self.network_globals.sync_state.read().is_synced() && self.network.is_execution_engine_online()
&& self.network_globals.peers.read().is_connected(&peer_id) {
{ self.block_lookups
self.block_lookups .search_parent(block, peer_id, &mut self.network);
.search_block(block_hash, peer_id, &mut self.network); }
} }
} SyncMessage::UnknownBlockHash(peer_id, block_hash) => {
SyncMessage::Disconnect(peer_id) => { // If we are not synced, ignore this block.
self.peer_disconnect(&peer_id); if self.network_globals.sync_state.read().is_synced()
} && self.network_globals.peers.read().is_connected(&peer_id)
SyncMessage::RpcError { && self.network.is_execution_engine_online()
peer_id, {
request_id, self.block_lookups
} => self.inject_error(peer_id, request_id), .search_block(block_hash, peer_id, &mut self.network);
SyncMessage::BlockProcessed { }
process_type, }
SyncMessage::Disconnect(peer_id) => {
self.peer_disconnect(&peer_id);
}
SyncMessage::RpcError {
peer_id,
request_id,
} => self.inject_error(peer_id, request_id),
SyncMessage::BlockProcessed {
process_type,
result,
} => match process_type {
BlockProcessType::SingleBlock { id } => {
self.block_lookups
.single_block_processed(id, result, &mut self.network)
}
BlockProcessType::ParentLookup { chain_hash } => self
.block_lookups
.parent_block_processed(chain_hash, result, &mut self.network),
},
SyncMessage::BatchProcessed { sync_type, result } => match sync_type {
ChainSegmentProcessId::RangeBatchId(chain_id, epoch, _) => {
self.range_sync.handle_block_process_result(
&mut self.network,
chain_id,
epoch,
result, result,
} => match process_type { );
BlockProcessType::SingleBlock { id } => self self.update_sync_state();
.block_lookups }
.single_block_processed(id, result, &mut self.network), ChainSegmentProcessId::BackSyncBatchId(epoch) => {
BlockProcessType::ParentLookup { chain_hash } => self match self.backfill_sync.on_batch_process_result(
.block_lookups &mut self.network,
.parent_block_processed(chain_hash, result, &mut self.network), epoch,
}, &result,
SyncMessage::BatchProcessed { sync_type, result } => match sync_type { ) {
ChainSegmentProcessId::RangeBatchId(chain_id, epoch, _) => { Ok(ProcessResult::Successful) => {}
self.range_sync.handle_block_process_result( Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
&mut self.network, Err(error) => {
chain_id, error!(self.log, "Backfill sync failed"; "error" => ?error);
epoch, // Update the global status
result,
);
self.update_sync_state(); self.update_sync_state();
} }
ChainSegmentProcessId::BackSyncBatchId(epoch) => { }
match self.backfill_sync.on_batch_process_result( }
&mut self.network, ChainSegmentProcessId::ParentLookup(chain_hash) => self
epoch, .block_lookups
&result, .parent_chain_processed(chain_hash, result, &mut self.network),
) { },
Ok(ProcessResult::Successful) => {} }
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), }
Err(error) => {
error!(self.log, "Backfill sync failed"; "error" => ?error); fn handle_new_execution_engine_state(&mut self, engine_state: EngineState) {
// Update the global status self.network.update_execution_engine_state(engine_state);
self.update_sync_state();
} match engine_state {
} EngineState::Online => {
} // Resume sync components.
ChainSegmentProcessId::ParentLookup(chain_hash) => self
.block_lookups // - Block lookups:
.parent_chain_processed(chain_hash, result, &mut self.network), // We start searching for blocks again. This is done by updating the stored ee online
}, // state. No further action required.
// - Parent lookups:
// We start searching for parents again. This is done by updating the stored ee
// online state. No further action required.
// - Range:
// Actively resume.
self.range_sync.resume(&mut self.network);
// - Backfill:
// Not affected by ee states, nothing to do.
}
EngineState::Offline => {
// Pause sync components.
// - Block lookups:
// Disabled while in this state. We drop current requests and don't search for new
// blocks.
let dropped_single_blocks_requests =
self.block_lookups.drop_single_block_requests();
// - Parent lookups:
// Disabled while in this state. We drop current requests and don't search for new
// blocks.
let dropped_parent_chain_requests = self.block_lookups.drop_parent_chain_requests();
// - Range:
// We still send found peers to range so that it can keep track of potential chains
// with respect to our current peers. Range will stop processing batches in the
// meantime. No further action from the manager is required for this.
// - Backfill: Not affected by ee states, nothing to do.
// Some logs.
if dropped_single_blocks_requests > 0 || dropped_parent_chain_requests > 0 {
debug!(self.log, "Execution engine not online, dropping active requests.";
"dropped_single_blocks_requests" => dropped_single_blocks_requests,
"dropped_parent_chain_requests" => dropped_parent_chain_requests,
);
} }
} }
} }

View File

@ -3,24 +3,25 @@
use super::manager::{Id, RequestId as SyncRequestId}; use super::manager::{Id, RequestId as SyncRequestId};
use super::range_sync::{BatchId, ChainId}; use super::range_sync::{BatchId, ChainId};
use crate::beacon_processor::WorkEvent;
use crate::service::{NetworkMessage, RequestId}; use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use beacon_chain::{BeaconChainTypes, EngineState};
use fnv::FnvHashMap; use fnv::FnvHashMap;
use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason}; use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
use slog::{debug, trace, warn}; use slog::{debug, trace, warn};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::EthSpec;
/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id.
pub struct SyncNetworkContext<T: EthSpec> { pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// The network channel to relay messages to the Network service. /// The network channel to relay messages to the Network service.
network_send: mpsc::UnboundedSender<NetworkMessage<T>>, network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
/// Access to the network global vars. /// Access to the network global vars.
network_globals: Arc<NetworkGlobals<T>>, network_globals: Arc<NetworkGlobals<T::EthSpec>>,
/// A sequential ID for all RPC requests. /// A sequential ID for all RPC requests.
request_id: Id, request_id: Id,
@ -28,24 +29,35 @@ pub struct SyncNetworkContext<T: EthSpec> {
/// BlocksByRange requests made by the range syncing algorithm. /// BlocksByRange requests made by the range syncing algorithm.
range_requests: FnvHashMap<Id, (ChainId, BatchId)>, range_requests: FnvHashMap<Id, (ChainId, BatchId)>,
/// BlocksByRange requests made by backfill syncing.
backfill_requests: FnvHashMap<Id, BatchId>, backfill_requests: FnvHashMap<Id, BatchId>,
/// Whether the ee is online. If it's not, we don't allow access to the
/// `beacon_processor_send`.
execution_engine_state: EngineState,
/// Channel to send work to the beacon processor.
beacon_processor_send: mpsc::Sender<WorkEvent<T>>,
/// Logger for the `SyncNetworkContext`. /// Logger for the `SyncNetworkContext`.
log: slog::Logger, log: slog::Logger,
} }
impl<T: EthSpec> SyncNetworkContext<T> { impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn new( pub fn new(
network_send: mpsc::UnboundedSender<NetworkMessage<T>>, network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T>>, network_globals: Arc<NetworkGlobals<T::EthSpec>>,
beacon_processor_send: mpsc::Sender<WorkEvent<T>>,
log: slog::Logger, log: slog::Logger,
) -> Self { ) -> Self {
Self { Self {
network_send, network_send,
execution_engine_state: EngineState::Online, // always assume `Online` at the start
network_globals, network_globals,
request_id: 1, request_id: 1,
range_requests: FnvHashMap::default(), range_requests: FnvHashMap::default(),
backfill_requests: FnvHashMap::default(), backfill_requests: FnvHashMap::default(),
beacon_processor_send,
log, log,
} }
} }
@ -211,6 +223,16 @@ impl<T: EthSpec> SyncNetworkContext<T> {
Ok(id) Ok(id)
} }
pub fn is_execution_engine_online(&self) -> bool {
self.execution_engine_state == EngineState::Online
}
pub fn update_execution_engine_state(&mut self, engine_state: EngineState) {
debug!(self.log, "Sync's view on execution engine state updated";
"past_state" => ?self.execution_engine_state, "new_state" => ?engine_state);
self.execution_engine_state = engine_state;
}
/// Terminates the connection with the peer and bans them. /// Terminates the connection with the peer and bans them.
pub fn goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { pub fn goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
self.network_send self.network_send
@ -249,13 +271,22 @@ impl<T: EthSpec> SyncNetworkContext<T> {
} }
/// Sends an arbitrary network message. /// Sends an arbitrary network message.
fn send_network_msg(&mut self, msg: NetworkMessage<T>) -> Result<(), &'static str> { fn send_network_msg(&mut self, msg: NetworkMessage<T::EthSpec>) -> Result<(), &'static str> {
self.network_send.send(msg).map_err(|_| { self.network_send.send(msg).map_err(|_| {
debug!(self.log, "Could not send message to the network service"); debug!(self.log, "Could not send message to the network service");
"Network channel send Failed" "Network channel send Failed"
}) })
} }
pub fn processor_channel_if_enabled(&self) -> Option<&mpsc::Sender<WorkEvent<T>>> {
self.is_execution_engine_online()
.then_some(&self.beacon_processor_send)
}
pub fn processor_channel(&self) -> &mpsc::Sender<WorkEvent<T>> {
&self.beacon_processor_send
}
fn next_id(&mut self) -> Id { fn next_id(&mut self) -> Id {
let id = self.request_id; let id = self.request_id;
self.request_id += 1; self.request_id += 1;

View File

@ -11,7 +11,6 @@ use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
@ -102,9 +101,6 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// Batches validated by this chain. /// Batches validated by this chain.
validated_batches: u64, validated_batches: u64,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: Sender<BeaconWorkEvent<T>>,
is_finalized_segment: bool, is_finalized_segment: bool,
/// The chain's log. /// The chain's log.
@ -132,7 +128,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
target_head_slot: Slot, target_head_slot: Slot,
target_head_root: Hash256, target_head_root: Hash256,
peer_id: PeerId, peer_id: PeerId,
beacon_processor_send: Sender<BeaconWorkEvent<T>>,
is_finalized_segment: bool, is_finalized_segment: bool,
log: &slog::Logger, log: &slog::Logger,
) -> Self { ) -> Self {
@ -155,7 +150,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
state: ChainSyncingState::Stopped, state: ChainSyncingState::Stopped,
current_processing_batch: None, current_processing_batch: None,
validated_batches: 0, validated_batches: 0,
beacon_processor_send,
is_finalized_segment, is_finalized_segment,
log: log.new(o!("chain" => id)), log: log.new(o!("chain" => id)),
} }
@ -186,7 +180,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
pub fn remove_peer( pub fn remove_peer(
&mut self, &mut self,
peer_id: &PeerId, peer_id: &PeerId,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
) -> ProcessingResult { ) -> ProcessingResult {
if let Some(batch_ids) = self.peers.remove(peer_id) { if let Some(batch_ids) = self.peers.remove(peer_id) {
// fail the batches // fail the batches
@ -227,7 +221,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// If the block correctly completes the batch it will be processed if possible. /// If the block correctly completes the batch it will be processed if possible.
pub fn on_block_response( pub fn on_block_response(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
peer_id: &PeerId, peer_id: &PeerId,
request_id: Id, request_id: Id,
@ -296,7 +290,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// The batch must exist and be ready for processing /// The batch must exist and be ready for processing
fn process_batch( fn process_batch(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
) -> ProcessingResult { ) -> ProcessingResult {
// Only process batches if this chain is Syncing, and only one at a time // Only process batches if this chain is Syncing, and only one at a time
@ -304,6 +298,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return Ok(KeepChain); return Ok(KeepChain);
} }
let beacon_processor_send = match network.processor_channel_if_enabled() {
Some(channel) => channel,
None => return Ok(KeepChain),
};
let batch = match self.batches.get_mut(&batch_id) { let batch = match self.batches.get_mut(&batch_id) {
Some(batch) => batch, Some(batch) => batch,
None => { None => {
@ -327,9 +326,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized); let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized);
self.current_processing_batch = Some(batch_id); self.current_processing_batch = Some(batch_id);
if let Err(e) = self if let Err(e) =
.beacon_processor_send beacon_processor_send.try_send(BeaconWorkEvent::chain_segment(process_id, blocks))
.try_send(BeaconWorkEvent::chain_segment(process_id, blocks))
{ {
crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch", crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch",
"error" => %e, "batch" => self.processing_target); "error" => %e, "batch" => self.processing_target);
@ -346,7 +344,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Processes the next ready batch, prioritizing optimistic batches over the processing target. /// Processes the next ready batch, prioritizing optimistic batches over the processing target.
fn process_completed_batches( fn process_completed_batches(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
) -> ProcessingResult { ) -> ProcessingResult {
// Only process batches if this chain is Syncing and only process one batch at a time // Only process batches if this chain is Syncing and only process one batch at a time
if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() {
@ -447,7 +445,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// of the batch processor. /// of the batch processor.
pub fn on_batch_process_result( pub fn on_batch_process_result(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
result: &BatchProcessResult, result: &BatchProcessResult,
) -> ProcessingResult { ) -> ProcessingResult {
@ -580,7 +578,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
fn reject_optimistic_batch( fn reject_optimistic_batch(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
redownload: bool, redownload: bool,
reason: &str, reason: &str,
) -> ProcessingResult { ) -> ProcessingResult {
@ -611,11 +609,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// ///
/// If a previous batch has been validated and it had been re-processed, penalize the original /// If a previous batch has been validated and it had been re-processed, penalize the original
/// peer. /// peer.
fn advance_chain( fn advance_chain(&mut self, network: &mut SyncNetworkContext<T>, validating_epoch: Epoch) {
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
validating_epoch: Epoch,
) {
// make sure this epoch produces an advancement // make sure this epoch produces an advancement
if validating_epoch <= self.start_epoch { if validating_epoch <= self.start_epoch {
return; return;
@ -719,7 +713,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// intended and can result in downvoting a peer. /// intended and can result in downvoting a peer.
fn handle_invalid_batch( fn handle_invalid_batch(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
) -> ProcessingResult { ) -> ProcessingResult {
// The current batch could not be processed, indicating either the current or previous // The current batch could not be processed, indicating either the current or previous
@ -778,7 +772,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// This could be new chain, or an old chain that is being resumed. /// This could be new chain, or an old chain that is being resumed.
pub fn start_syncing( pub fn start_syncing(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
local_finalized_epoch: Epoch, local_finalized_epoch: Epoch,
optimistic_start_epoch: Epoch, optimistic_start_epoch: Epoch,
) -> ProcessingResult { ) -> ProcessingResult {
@ -816,7 +810,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// If the chain is active, this starts requesting batches from this peer. /// If the chain is active, this starts requesting batches from this peer.
pub fn add_peer( pub fn add_peer(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
peer_id: PeerId, peer_id: PeerId,
) -> ProcessingResult { ) -> ProcessingResult {
// add the peer without overwriting its active requests // add the peer without overwriting its active requests
@ -833,7 +827,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// If the batch exists it is re-requested. /// If the batch exists it is re-requested.
pub fn inject_error( pub fn inject_error(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
peer_id: &PeerId, peer_id: &PeerId,
request_id: Id, request_id: Id,
@ -865,7 +859,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Sends and registers the request of a batch awaiting download. /// Sends and registers the request of a batch awaiting download.
pub fn retry_batch_download( pub fn retry_batch_download(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
) -> ProcessingResult { ) -> ProcessingResult {
let batch = match self.batches.get_mut(&batch_id) { let batch = match self.batches.get_mut(&batch_id) {
@ -898,7 +892,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Requests the batch assigned to the given id from a given peer. /// Requests the batch assigned to the given id from a given peer.
pub fn send_batch( pub fn send_batch(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
peer: PeerId, peer: PeerId,
) -> ProcessingResult { ) -> ProcessingResult {
@ -967,12 +961,21 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
} }
} }
/// Kickstarts the chain by sending for processing batches that are ready and requesting more
/// batches if needed.
pub fn resume(
&mut self,
network: &mut SyncNetworkContext<T>,
) -> Result<KeepChain, RemoveChain> {
// Request more batches if needed.
self.request_batches(network)?;
// If there is any batch ready for processing, send it.
self.process_completed_batches(network)
}
/// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer
/// pool and left over batches until the batch buffer is reached or all peers are exhausted. /// pool and left over batches until the batch buffer is reached or all peers are exhausted.
fn request_batches( fn request_batches(&mut self, network: &mut SyncNetworkContext<T>) -> ProcessingResult {
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
) -> ProcessingResult {
if !matches!(self.state, ChainSyncingState::Syncing) { if !matches!(self.state, ChainSyncingState::Syncing) {
return Ok(KeepChain); return Ok(KeepChain);
} }

View File

@ -6,7 +6,6 @@
use super::block_storage::BlockStorage; use super::block_storage::BlockStorage;
use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain};
use super::sync_type::RangeSyncType; use super::sync_type::RangeSyncType;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::metrics; use crate::metrics;
use crate::sync::network_context::SyncNetworkContext; use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::BeaconChainTypes; use beacon_chain::BeaconChainTypes;
@ -18,7 +17,6 @@ use smallvec::SmallVec;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc;
use types::EthSpec; use types::EthSpec;
use types::{Epoch, Hash256, Slot}; use types::{Epoch, Hash256, Slot};
@ -193,10 +191,9 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
/// do so. /// do so.
pub fn update( pub fn update(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
local: &SyncInfo, local: &SyncInfo,
awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>, awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>,
beacon_processor_send: &mpsc::Sender<BeaconWorkEvent<T>>,
) { ) {
// Remove any outdated finalized/head chains // Remove any outdated finalized/head chains
self.purge_outdated_chains(local, awaiting_head_peers); self.purge_outdated_chains(local, awaiting_head_peers);
@ -212,7 +209,6 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
local.finalized_epoch, local.finalized_epoch,
local_head_epoch, local_head_epoch,
awaiting_head_peers, awaiting_head_peers,
beacon_processor_send,
); );
} }
} }
@ -257,7 +253,7 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
/// or not. /// or not.
fn update_finalized_chains( fn update_finalized_chains(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
local_epoch: Epoch, local_epoch: Epoch,
local_head_epoch: Epoch, local_head_epoch: Epoch,
) { ) {
@ -326,11 +322,10 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
/// Start syncing any head chains if required. /// Start syncing any head chains if required.
fn update_head_chains( fn update_head_chains(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
local_epoch: Epoch, local_epoch: Epoch,
local_head_epoch: Epoch, local_head_epoch: Epoch,
awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>, awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>,
beacon_processor_send: &mpsc::Sender<BeaconWorkEvent<T>>,
) { ) {
// Include the awaiting head peers // Include the awaiting head peers
for (peer_id, peer_sync_info) in awaiting_head_peers.drain() { for (peer_id, peer_sync_info) in awaiting_head_peers.drain() {
@ -341,7 +336,6 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
peer_sync_info.head_slot, peer_sync_info.head_slot,
peer_id, peer_id,
RangeSyncType::Head, RangeSyncType::Head,
beacon_processor_send,
network, network,
); );
} }
@ -468,8 +462,7 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
target_head_slot: Slot, target_head_slot: Slot,
peer: PeerId, peer: PeerId,
sync_type: RangeSyncType, sync_type: RangeSyncType,
beacon_processor_send: &mpsc::Sender<BeaconWorkEvent<T>>, network: &mut SyncNetworkContext<T>,
network: &mut SyncNetworkContext<T::EthSpec>,
) { ) {
let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot); let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot);
let (collection, is_finalized) = if let RangeSyncType::Finalized = sync_type { let (collection, is_finalized) = if let RangeSyncType::Finalized = sync_type {
@ -500,7 +493,6 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
target_head_slot, target_head_slot,
target_head_root, target_head_root,
peer, peer,
beacon_processor_send.clone(),
is_finalized, is_finalized,
&self.log, &self.log,
); );

View File

@ -43,7 +43,6 @@ use super::block_storage::BlockStorage;
use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain};
use super::chain_collection::ChainCollection; use super::chain_collection::ChainCollection;
use super::sync_type::RangeSyncType; use super::sync_type::RangeSyncType;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use crate::sync::manager::Id; use crate::sync::manager::Id;
use crate::sync::network_context::SyncNetworkContext; use crate::sync::network_context::SyncNetworkContext;
@ -56,7 +55,6 @@ use lru_cache::LRUTimeCache;
use slog::{crit, debug, trace, warn}; use slog::{crit, debug, trace, warn};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
/// For how long we store failed finalized chains to prevent retries. /// For how long we store failed finalized chains to prevent retries.
@ -76,8 +74,6 @@ pub struct RangeSync<T: BeaconChainTypes, C = BeaconChain<T>> {
chains: ChainCollection<T, C>, chains: ChainCollection<T, C>,
/// Chains that have failed and are stored to prevent being retried. /// Chains that have failed and are stored to prevent being retried.
failed_chains: LRUTimeCache<Hash256>, failed_chains: LRUTimeCache<Hash256>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
/// The syncing logger. /// The syncing logger.
log: slog::Logger, log: slog::Logger,
} }
@ -87,11 +83,7 @@ where
C: BlockStorage + ToStatusMessage, C: BlockStorage + ToStatusMessage,
T: BeaconChainTypes, T: BeaconChainTypes,
{ {
pub fn new( pub fn new(beacon_chain: Arc<C>, log: slog::Logger) -> Self {
beacon_chain: Arc<C>,
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T>>,
log: slog::Logger,
) -> Self {
RangeSync { RangeSync {
beacon_chain: beacon_chain.clone(), beacon_chain: beacon_chain.clone(),
chains: ChainCollection::new(beacon_chain, log.clone()), chains: ChainCollection::new(beacon_chain, log.clone()),
@ -99,7 +91,6 @@ where
FAILED_CHAINS_EXPIRY_SECONDS, FAILED_CHAINS_EXPIRY_SECONDS,
)), )),
awaiting_head_peers: HashMap::new(), awaiting_head_peers: HashMap::new(),
beacon_processor_send,
log, log,
} }
} }
@ -117,7 +108,7 @@ where
/// prioritised by peer-pool size. /// prioritised by peer-pool size.
pub fn add_peer( pub fn add_peer(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
local_info: SyncInfo, local_info: SyncInfo,
peer_id: PeerId, peer_id: PeerId,
remote_info: SyncInfo, remote_info: SyncInfo,
@ -159,16 +150,11 @@ where
remote_finalized_slot, remote_finalized_slot,
peer_id, peer_id,
RangeSyncType::Finalized, RangeSyncType::Finalized,
&self.beacon_processor_send,
network, network,
); );
self.chains.update( self.chains
network, .update(network, &local_info, &mut self.awaiting_head_peers);
&local_info,
&mut self.awaiting_head_peers,
&self.beacon_processor_send,
);
} }
RangeSyncType::Head => { RangeSyncType::Head => {
// This peer requires a head chain sync // This peer requires a head chain sync
@ -197,15 +183,10 @@ where
remote_info.head_slot, remote_info.head_slot,
peer_id, peer_id,
RangeSyncType::Head, RangeSyncType::Head,
&self.beacon_processor_send,
network, network,
); );
self.chains.update( self.chains
network, .update(network, &local_info, &mut self.awaiting_head_peers);
&local_info,
&mut self.awaiting_head_peers,
&self.beacon_processor_send,
);
} }
} }
} }
@ -216,7 +197,7 @@ where
/// This request could complete a chain or simply add to its progress. /// This request could complete a chain or simply add to its progress.
pub fn blocks_by_range_response( pub fn blocks_by_range_response(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
peer_id: PeerId, peer_id: PeerId,
chain_id: ChainId, chain_id: ChainId,
batch_id: BatchId, batch_id: BatchId,
@ -246,7 +227,7 @@ where
pub fn handle_block_process_result( pub fn handle_block_process_result(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
chain_id: ChainId, chain_id: ChainId,
batch_id: Epoch, batch_id: Epoch,
result: BatchProcessResult, result: BatchProcessResult,
@ -276,11 +257,7 @@ where
/// A peer has disconnected. This removes the peer from any ongoing chains and mappings. A /// A peer has disconnected. This removes the peer from any ongoing chains and mappings. A
/// disconnected peer could remove a chain /// disconnected peer could remove a chain
pub fn peer_disconnect( pub fn peer_disconnect(&mut self, network: &mut SyncNetworkContext<T>, peer_id: &PeerId) {
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
peer_id: &PeerId,
) {
// if the peer is in the awaiting head mapping, remove it // if the peer is in the awaiting head mapping, remove it
self.awaiting_head_peers.remove(peer_id); self.awaiting_head_peers.remove(peer_id);
@ -292,7 +269,7 @@ where
/// which pool the peer is in. The chain may also have a batch or batches awaiting /// which pool the peer is in. The chain may also have a batch or batches awaiting
/// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum
/// retries. In this case, we need to remove the chain. /// retries. In this case, we need to remove the chain.
fn remove_peer(&mut self, network: &mut SyncNetworkContext<T::EthSpec>, peer_id: &PeerId) { fn remove_peer(&mut self, network: &mut SyncNetworkContext<T>, peer_id: &PeerId) {
for (removed_chain, sync_type, remove_reason) in self for (removed_chain, sync_type, remove_reason) in self
.chains .chains
.call_all(|chain| chain.remove_peer(peer_id, network)) .call_all(|chain| chain.remove_peer(peer_id, network))
@ -304,8 +281,6 @@ where
network, network,
"peer removed", "peer removed",
); );
// update the state of the collection
} }
} }
@ -315,7 +290,7 @@ where
/// been too many failed attempts for the batch, remove the chain. /// been too many failed attempts for the batch, remove the chain.
pub fn inject_error( pub fn inject_error(
&mut self, &mut self,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
peer_id: PeerId, peer_id: PeerId,
batch_id: BatchId, batch_id: BatchId,
chain_id: ChainId, chain_id: ChainId,
@ -347,7 +322,7 @@ where
chain: SyncingChain<T>, chain: SyncingChain<T>,
sync_type: RangeSyncType, sync_type: RangeSyncType,
remove_reason: RemoveChain, remove_reason: RemoveChain,
network: &mut SyncNetworkContext<T::EthSpec>, network: &mut SyncNetworkContext<T>,
op: &'static str, op: &'static str,
) { ) {
if remove_reason.is_critical() { if remove_reason.is_critical() {
@ -374,12 +349,23 @@ where
}; };
// update the state of the collection // update the state of the collection
self.chains.update( self.chains
network, .update(network, &local, &mut self.awaiting_head_peers);
&local, }
&mut self.awaiting_head_peers,
&self.beacon_processor_send, /// Kickstarts sync.
); pub fn resume(&mut self, network: &mut SyncNetworkContext<T>) {
for (removed_chain, sync_type, remove_reason) in
self.chains.call_all(|chain| chain.resume(network))
{
self.on_chain_removed(
removed_chain,
sync_type,
remove_reason,
network,
"chain resumed",
);
}
} }
} }
@ -389,13 +375,16 @@ mod tests {
use crate::NetworkMessage; use crate::NetworkMessage;
use super::*; use super::*;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use beacon_chain::builder::Witness; use beacon_chain::builder::Witness;
use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::eth1_chain::CachingEth1Backend;
use beacon_chain::parking_lot::RwLock; use beacon_chain::parking_lot::RwLock;
use beacon_chain::EngineState;
use lighthouse_network::rpc::BlocksByRangeRequest; use lighthouse_network::rpc::BlocksByRangeRequest;
use lighthouse_network::Request; use lighthouse_network::Request;
use lighthouse_network::{rpc::StatusMessage, NetworkGlobals}; use lighthouse_network::{rpc::StatusMessage, NetworkGlobals};
use slog::{o, Drain}; use slog::{o, Drain};
use tokio::sync::mpsc;
use slot_clock::SystemTimeSlotClock; use slot_clock::SystemTimeSlotClock;
use std::collections::HashSet; use std::collections::HashSet;
@ -470,7 +459,7 @@ mod tests {
/// To set up different scenarios where sync is told about known/unkown blocks. /// To set up different scenarios where sync is told about known/unkown blocks.
chain: Arc<FakeStorage>, chain: Arc<FakeStorage>,
/// Needed by range to handle communication with the network. /// Needed by range to handle communication with the network.
cx: SyncNetworkContext<E>, cx: SyncNetworkContext<TestBeaconChainType>,
/// To check what the network receives from Range. /// To check what the network receives from Range.
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>, network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
/// To modify what the network declares about various global variables, in particular about /// To modify what the network declares about various global variables, in particular about
@ -516,12 +505,13 @@ mod tests {
} }
/// Reads an BlocksByRange request to a given peer from the network receiver channel. /// Reads an BlocksByRange request to a given peer from the network receiver channel.
#[track_caller]
fn grab_request(&mut self, expected_peer: &PeerId) -> (RequestId, BlocksByRangeRequest) { fn grab_request(&mut self, expected_peer: &PeerId) -> (RequestId, BlocksByRangeRequest) {
if let Some(NetworkMessage::SendRequest { if let Ok(NetworkMessage::SendRequest {
peer_id, peer_id,
request: Request::BlocksByRange(request), request: Request::BlocksByRange(request),
request_id, request_id,
}) = self.network_rx.blocking_recv() }) = self.network_rx.try_recv()
{ {
assert_eq!(&peer_id, expected_peer); assert_eq!(&peer_id, expected_peer);
(request_id, request) (request_id, request)
@ -575,6 +565,29 @@ mod tests {
let peer_id = PeerId::random(); let peer_id = PeerId::random();
(peer_id, local_info, remote_info) (peer_id, local_info, remote_info)
} }
#[track_caller]
fn expect_empty_processor(&mut self) {
match self.beacon_processor_rx.try_recv() {
Ok(work) => {
panic!("Expected empty processor. Instead got {}", work.work_type());
}
Err(e) => match e {
mpsc::error::TryRecvError::Empty => {}
mpsc::error::TryRecvError::Disconnected => unreachable!("bad coded test?"),
},
}
}
#[track_caller]
fn expect_chain_segment(&mut self) {
match self.beacon_processor_rx.try_recv() {
Ok(work) => {
assert_eq!(work.work_type(), crate::beacon_processor::CHAIN_SEGMENT);
}
other => panic!("Expected chain segment process, found {:?}", other),
}
}
} }
fn range(log_enabled: bool) -> (TestRig, RangeSync<TestBeaconChainType, FakeStorage>) { fn range(log_enabled: bool) -> (TestRig, RangeSync<TestBeaconChainType, FakeStorage>) {
@ -583,7 +596,6 @@ mod tests {
let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(10); let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(10);
let range_sync = RangeSync::<TestBeaconChainType, FakeStorage>::new( let range_sync = RangeSync::<TestBeaconChainType, FakeStorage>::new(
chain.clone(), chain.clone(),
beacon_processor_tx,
log.new(o!("component" => "range")), log.new(o!("component" => "range")),
); );
let (network_tx, network_rx) = mpsc::unbounded_channel(); let (network_tx, network_rx) = mpsc::unbounded_channel();
@ -591,6 +603,7 @@ mod tests {
let cx = SyncNetworkContext::new( let cx = SyncNetworkContext::new(
network_tx, network_tx,
globals.clone(), globals.clone(),
beacon_processor_tx,
log.new(o!("component" => "network_context")), log.new(o!("component" => "network_context")),
); );
let test_rig = TestRig { let test_rig = TestRig {
@ -661,4 +674,53 @@ mod tests {
let (finalized_peer, local_info, remote_info) = rig.finalized_peer(); let (finalized_peer, local_info, remote_info) = rig.finalized_peer();
range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info); range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info);
} }
#[test]
fn pause_and_resume_on_ee_offline() {
let (mut rig, mut range) = range(true);
// add some peers
let (peer1, local_info, head_info) = rig.head_peer();
range.add_peer(&mut rig.cx, local_info, peer1, head_info);
let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => {
(rig.cx.range_sync_response(id, true).unwrap(), id)
}
other => panic!("unexpected request {:?}", other),
};
// make the ee offline
rig.cx.update_execution_engine_state(EngineState::Offline);
// send the response to the request
range.blocks_by_range_response(&mut rig.cx, peer1, chain1, batch1, id1, None);
// the beacon processor shouldn't have received any work
rig.expect_empty_processor();
// while the ee is offline, more peers might arrive. Add a new finalized peer.
let (peer2, local_info, finalized_info) = rig.finalized_peer();
range.add_peer(&mut rig.cx, local_info, peer2, finalized_info);
let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => {
(rig.cx.range_sync_response(id, true).unwrap(), id)
}
other => panic!("unexpected request {:?}", other),
};
// send the response to the request
range.blocks_by_range_response(&mut rig.cx, peer2, chain2, batch2, id2, None);
// the beacon processor shouldn't have received any work
rig.expect_empty_processor();
// make the beacon processor available again.
rig.cx.update_execution_engine_state(EngineState::Online);
// now resume range, we should have two processing requests in the beacon processor.
range.resume(&mut rig.cx);
rig.expect_chain_segment();
rig.expect_chain_segment();
}
} }