diff --git a/Cargo.lock b/Cargo.lock index 25701fe05..95602bf1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1090,6 +1090,7 @@ dependencies = [ "serde_derive", "serde_yaml", "slasher", + "slasher_service", "slog", "slog-async", "sloggers", @@ -5884,7 +5885,6 @@ version = "0.1.0" dependencies = [ "bincode", "byteorder", - "directory", "eth2_ssz", "eth2_ssz_derive", "flate2", @@ -5901,15 +5901,29 @@ dependencies = [ "serde_derive", "slog", "sloggers", - "slot_clock", - "task_executor", "tempdir", - "tokio 0.3.5", "tree_hash", "tree_hash_derive", "types", ] +[[package]] +name = "slasher_service" +version = "0.1.0" +dependencies = [ + "beacon_chain", + "directory", + "eth2_libp2p", + "network", + "slasher", + "slog", + "slot_clock", + "state_processing", + "task_executor", + "tokio 0.3.5", + "types", +] + [[package]] name = "slashing_protection" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 95f59c493..8fd60b5ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ members = [ "remote_signer/client", "slasher", + "slasher/service", "testing/ef_tests", "testing/eth1_test_rig", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index cdc1c251f..0e6fb6fc4 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -39,7 +39,7 @@ use slot_clock::SlotClock; use state_processing::{ common::get_indexed_attestation, per_block_processing, per_block_processing::errors::AttestationValidationError, per_slot_processing, - BlockSignatureStrategy, SigVerifiedOp, VerifyOperation, + BlockSignatureStrategy, SigVerifiedOp, }; use std::borrow::Cow; use std::cmp::Ordering; @@ -1125,63 +1125,6 @@ impl BeaconChain { Ok(signed_aggregate) } - /// Move slashings collected by the slasher into the op pool for block inclusion. - fn ingest_slashings_to_op_pool(&self, state: &BeaconState) { - if let Some(slasher) = self.slasher.as_ref() { - let attester_slashings = slasher.get_attester_slashings(); - let proposer_slashings = slasher.get_proposer_slashings(); - - if !attester_slashings.is_empty() || !proposer_slashings.is_empty() { - debug!( - self.log, - "Ingesting slashings"; - "num_attester_slashings" => attester_slashings.len(), - "num_proposer_slashings" => proposer_slashings.len(), - ); - } - - for slashing in attester_slashings { - let verified_slashing = match slashing.clone().validate(state, &self.spec) { - Ok(verified) => verified, - Err(e) => { - error!( - self.log, - "Attester slashing from slasher failed verification"; - "error" => format!("{:?}", e), - "slashing" => format!("{:?}", slashing), - ); - continue; - } - }; - - if let Err(e) = self.import_attester_slashing(verified_slashing) { - error!( - self.log, - "Attester slashing from slasher is invalid"; - "error" => format!("{:?}", e), - "slashing" => format!("{:?}", slashing), - ); - } - } - - for slashing in proposer_slashings { - let verified_slashing = match slashing.clone().validate(state, &self.spec) { - Ok(verified) => verified, - Err(e) => { - error!( - self.log, - "Proposer slashing from slasher failed verification"; - "error" => format!("{:?}", e), - "slashing" => format!("{:?}", slashing), - ); - continue; - } - }; - self.import_proposer_slashing(verified_slashing); - } - } - } - /// Check that the shuffling at `block_root` is equal to one of the shufflings of `state`. /// /// The `target_epoch` argument determines which shuffling to check compatibility with, it @@ -1876,7 +1819,6 @@ impl BeaconChain { state.latest_block_header.canonical_root() }; - self.ingest_slashings_to_op_pool(&state); let (proposer_slashings, attester_slashings) = self.op_pool.get_slashings(&state, &self.spec); @@ -2093,7 +2035,6 @@ impl BeaconChain { if is_epoch_transition || is_reorg { self.persist_head_and_fork_choice()?; self.op_pool.prune_attestations(self.epoch()?); - self.ingest_slashings_to_op_pool(&new_head.beacon_state); self.persist_op_pool()?; } diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 49e0fd548..638072b66 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -43,3 +43,4 @@ directory = {path = "../../common/directory"} http_api = { path = "../http_api" } http_metrics = { path = "../http_metrics" } slasher = { path = "../../slasher" } +slasher_service = { path = "../../slasher/service" } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 737cd072e..3443390d1 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -13,7 +13,8 @@ use eth1::{Config as Eth1Config, Service as Eth1Service}; use eth2_libp2p::NetworkGlobals; use genesis::{interop_genesis_state, Eth1GenesisService}; use network::{NetworkConfig, NetworkMessage, NetworkService}; -use slasher::{Slasher, SlasherServer}; +use slasher::Slasher; +use slasher_service::SlasherService; use slog::{debug, info, warn}; use ssz::Decode; use std::net::TcpListener; @@ -348,22 +349,21 @@ where /// Immediately start the slasher service. /// /// Error if no slasher is configured. - pub fn start_slasher_server(&self) -> Result<(), String> { + pub fn start_slasher_service(&self) -> Result<(), String> { + let beacon_chain = self + .beacon_chain + .clone() + .ok_or("slasher service requires a beacon chain")?; + let network_send = self + .network_send + .clone() + .ok_or("slasher service requires a network sender")?; let context = self .runtime_context .as_ref() .ok_or("slasher requires a runtime_context")? - .service_context("slasher_server_ctxt".into()); - let slasher = self - .slasher - .clone() - .ok_or("slasher server requires a slasher")?; - let slot_clock = self - .slot_clock - .clone() - .ok_or("slasher server requires a slot clock")?; - SlasherServer::run(slasher, slot_clock, &context.executor); - Ok(()) + .service_context("slasher_service_ctxt".into()); + SlasherService::new(beacon_chain, network_send).run(&context.executor) } /// Immediately starts the service that periodically logs information each slot. @@ -470,7 +470,7 @@ where }; if self.slasher.is_some() { - self.start_slasher_server()?; + self.start_slasher_service()?; } Ok(Client { diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index f89579cf8..629d8e831 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -434,6 +434,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .requires("slasher") .takes_value(true) ) + .arg( + Arg::with_name("slasher-broadcast") + .long("slasher-broadcast") + .help("Broadcast slashings found by the slasher to the rest of the network \ + [disabled by default].") + .requires("slasher") + ) .arg( Arg::with_name("wss-checkpoint") .long("wss-checkpoint") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 38e752498..699e8d418 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -375,6 +375,8 @@ pub fn get_config( slasher_config.validator_chunk_size = validator_chunk_size; } + slasher_config.broadcast = cli_args.is_present("slasher-broadcast"); + client_config.slasher = Some(slasher_config); } diff --git a/slasher/Cargo.toml b/slasher/Cargo.toml index 8ca9c2c02..6bb3cf2e7 100644 --- a/slasher/Cargo.toml +++ b/slasher/Cargo.toml @@ -7,7 +7,6 @@ edition = "2018" [dependencies] bincode = "1.3.1" byteorder = "1.3.4" -directory = { path = "../common/directory" } eth2_ssz = { path = "../consensus/ssz" } eth2_ssz_derive = { path = "../consensus/ssz_derive" } flate2 = { version = "1.0.14", features = ["zlib"], default-features = false } @@ -22,9 +21,6 @@ serde = "1.0" serde_derive = "1.0" slog = "2.5.2" sloggers = "*" -slot_clock = { path = "../common/slot_clock" } -task_executor = { path = "../common/task_executor" } -tokio = { version = "0.3.5", features = ["full"] } tree_hash = { path = "../consensus/tree_hash" } tree_hash_derive = { path = "../consensus/tree_hash_derive" } types = { path = "../consensus/types" } diff --git a/slasher/service/Cargo.toml b/slasher/service/Cargo.toml new file mode 100644 index 000000000..0326734f8 --- /dev/null +++ b/slasher/service/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "slasher_service" +version = "0.1.0" +authors = ["Michael Sproul "] +edition = "2018" + +[dependencies] +beacon_chain = { path = "../../beacon_node/beacon_chain" } +directory = { path = "../../common/directory" } +eth2_libp2p = { path = "../../beacon_node/eth2_libp2p" } +network = { path = "../../beacon_node/network" } +slasher = { path = ".." } +slog = "2.5.2" +slot_clock = { path = "../../common/slot_clock" } +state_processing = { path = "../../consensus/state_processing" } +task_executor = { path = "../../common/task_executor" } +tokio = { version = "0.3.5", features = ["full"] } +types = { path = "../../consensus/types" } diff --git a/slasher/service/src/lib.rs b/slasher/service/src/lib.rs new file mode 100644 index 000000000..ac15b49ee --- /dev/null +++ b/slasher/service/src/lib.rs @@ -0,0 +1,3 @@ +mod service; + +pub use service::SlasherService; diff --git a/slasher/service/src/service.rs b/slasher/service/src/service.rs new file mode 100644 index 000000000..5608eac5f --- /dev/null +++ b/slasher/service/src/service.rs @@ -0,0 +1,298 @@ +use beacon_chain::{ + observed_operations::ObservationOutcome, BeaconChain, BeaconChainError, BeaconChainTypes, +}; +use directory::size_of_dir; +use eth2_libp2p::PubsubMessage; +use network::NetworkMessage; +use slasher::{ + metrics::{self, SLASHER_DATABASE_SIZE, SLASHER_RUN_TIME}, + Slasher, +}; +use slog::{debug, error, info, trace, warn, Logger}; +use slot_clock::SlotClock; +use state_processing::VerifyOperation; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError}; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::stream::StreamExt; +use tokio::sync::mpsc::UnboundedSender; +use tokio::time::{interval_at, Duration, Instant}; +use types::{AttesterSlashing, Epoch, EthSpec, ProposerSlashing}; + +pub struct SlasherService { + beacon_chain: Arc>, + network_sender: UnboundedSender>, +} + +impl SlasherService { + /// Create a new service but don't start any tasks yet. + pub fn new( + beacon_chain: Arc>, + network_sender: UnboundedSender>, + ) -> Self { + Self { + beacon_chain, + network_sender, + } + } + + /// Start the slasher service tasks on the `executor`. + pub fn run(&self, executor: &TaskExecutor) -> Result<(), String> { + let slasher = self + .beacon_chain + .slasher + .clone() + .ok_or("No slasher is configured")?; + let log = slasher.log().clone(); + + info!(log, "Starting slasher"; "broadcast" => slasher.config().broadcast); + + // Buffer just a single message in the channel. If the receiver is still processing, we + // don't need to burden them with more work (we can wait). + let (notif_sender, notif_receiver) = sync_channel(1); + let update_period = slasher.config().update_period; + let beacon_chain = self.beacon_chain.clone(); + let network_sender = self.network_sender.clone(); + + executor.spawn( + Self::run_notifier(beacon_chain.clone(), update_period, notif_sender, log), + "slasher_server_notifier", + ); + + executor.spawn_blocking( + || Self::run_processor(beacon_chain, slasher, notif_receiver, network_sender), + "slasher_server_processor", + ); + + Ok(()) + } + + /// Run the async notifier which periodically prompts the processor to run. + async fn run_notifier( + beacon_chain: Arc>, + update_period: u64, + notif_sender: SyncSender, + log: Logger, + ) { + // NOTE: could align each run to some fixed point in each slot, see: + // https://github.com/sigp/lighthouse/issues/1861 + let mut interval = interval_at(Instant::now(), Duration::from_secs(update_period)); + + while interval.next().await.is_some() { + if let Some(current_slot) = beacon_chain.slot_clock.now() { + let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); + if let Err(TrySendError::Disconnected(_)) = notif_sender.try_send(current_epoch) { + break; + } + } else { + trace!(log, "Slasher has nothing to do: we are pre-genesis"); + } + } + } + + /// Run the blocking task that performs work. + fn run_processor( + beacon_chain: Arc>, + slasher: Arc>, + notif_receiver: Receiver, + network_sender: UnboundedSender>, + ) { + let log = slasher.log(); + while let Ok(current_epoch) = notif_receiver.recv() { + let t = Instant::now(); + + let batch_timer = metrics::start_timer(&SLASHER_RUN_TIME); + let stats = match slasher.process_queued(current_epoch) { + Ok(stats) => Some(stats), + Err(e) => { + error!( + log, + "Error during scheduled slasher processing"; + "epoch" => current_epoch, + "error" => format!("{:?}", e) + ); + None + } + }; + drop(batch_timer); + + // Prune the database, even in the case where batch processing failed. + // If the LMDB database is full then pruning could help to free it up. + if let Err(e) = slasher.prune_database(current_epoch) { + error!( + log, + "Error during slasher database pruning"; + "epoch" => current_epoch, + "error" => format!("{:?}", e), + ); + continue; + }; + + // Provide slashings to the beacon chain, and optionally publish them. + Self::process_slashings(&beacon_chain, &slasher, &network_sender); + + let database_size = size_of_dir(&slasher.config().database_path); + metrics::set_gauge(&SLASHER_DATABASE_SIZE, database_size as i64); + + if let Some(stats) = stats { + debug!( + log, + "Completed slasher update"; + "epoch" => current_epoch, + "time_taken" => format!("{}ms", t.elapsed().as_millis()), + "num_attestations" => stats.attestation_stats.num_processed, + "num_blocks" => stats.block_stats.num_processed, + ); + } + } + } + + /// Push any slashings found to the beacon chain, optionally publishing them on the network. + fn process_slashings( + beacon_chain: &BeaconChain, + slasher: &Slasher, + network_sender: &UnboundedSender>, + ) { + Self::process_attester_slashings(beacon_chain, slasher, network_sender); + Self::process_proposer_slashings(beacon_chain, slasher, network_sender); + } + + fn process_attester_slashings( + beacon_chain: &BeaconChain, + slasher: &Slasher, + network_sender: &UnboundedSender>, + ) { + let log = slasher.log(); + let attester_slashings = slasher.get_attester_slashings(); + + for slashing in attester_slashings { + // Verify slashing signature. + let verified_slashing = match beacon_chain.with_head(|head| { + Ok::<_, BeaconChainError>( + slashing + .clone() + .validate(&head.beacon_state, &beacon_chain.spec)?, + ) + }) { + Ok(verified) => verified, + Err(e) => { + warn!( + log, + "Attester slashing produced is invalid"; + "error" => format!("{:?}", e), + "slashing" => format!("{:?}", slashing), + ); + continue; + } + }; + + // Add to local op pool. + if let Err(e) = beacon_chain.import_attester_slashing(verified_slashing) { + error!( + log, + "Beacon chain refused attester slashing"; + "error" => format!("{:?}", e), + "slashing" => format!("{:?}", slashing), + ); + } + + // Publish to the network if broadcast is enabled. + if slasher.config().broadcast { + if let Err(e) = + Self::publish_attester_slashing(&beacon_chain, &network_sender, slashing) + { + debug!( + log, + "Unable to publish attester slashing"; + "error" => format!("{:?}", e), + ); + } + } + } + } + + fn process_proposer_slashings( + beacon_chain: &BeaconChain, + slasher: &Slasher, + network_sender: &UnboundedSender>, + ) { + let log = slasher.log(); + let proposer_slashings = slasher.get_proposer_slashings(); + + for slashing in proposer_slashings { + let verified_slashing = match beacon_chain.with_head(|head| { + Ok::<_, BeaconChainError>( + slashing + .clone() + .validate(&head.beacon_state, &beacon_chain.spec)?, + ) + }) { + Ok(verified) => verified, + Err(e) => { + error!( + log, + "Proposer slashing produced is invalid"; + "error" => format!("{:?}", e), + "slashing" => format!("{:?}", slashing), + ); + continue; + } + }; + beacon_chain.import_proposer_slashing(verified_slashing); + + if slasher.config().broadcast { + if let Err(e) = + Self::publish_proposer_slashing(&beacon_chain, &network_sender, slashing) + { + debug!( + log, + "Unable to publish proposer slashing"; + "error" => format!("{:?}", e), + ); + } + } + } + } + + fn publish_attester_slashing( + beacon_chain: &BeaconChain, + network_sender: &UnboundedSender>, + slashing: AttesterSlashing, + ) -> Result<(), String> { + let outcome = beacon_chain + .verify_attester_slashing_for_gossip(slashing) + .map_err(|e| format!("gossip verification error: {:?}", e))?; + + if let ObservationOutcome::New(slashing) = outcome { + network_sender + .send(NetworkMessage::Publish { + messages: vec![PubsubMessage::AttesterSlashing(Box::new( + slashing.into_inner(), + ))], + }) + .map_err(|e| format!("network error: {:?}", e))?; + } + Ok(()) + } + + fn publish_proposer_slashing( + beacon_chain: &BeaconChain, + network_sender: &UnboundedSender>, + slashing: ProposerSlashing, + ) -> Result<(), String> { + let outcome = beacon_chain + .verify_proposer_slashing_for_gossip(slashing) + .map_err(|e| format!("gossip verification error: {:?}", e))?; + + if let ObservationOutcome::New(slashing) = outcome { + network_sender + .send(NetworkMessage::Publish { + messages: vec![PubsubMessage::ProposerSlashing(Box::new( + slashing.into_inner(), + ))], + }) + .map_err(|e| format!("network error: {:?}", e))?; + } + Ok(()) + } +} diff --git a/slasher/src/batch_stats.rs b/slasher/src/batch_stats.rs new file mode 100644 index 000000000..71f4c8e4e --- /dev/null +++ b/slasher/src/batch_stats.rs @@ -0,0 +1,16 @@ +#[derive(Debug)] +pub struct BatchStats { + pub block_stats: BlockStats, + pub attestation_stats: AttestationStats, +} + +#[derive(Debug)] +pub struct BlockStats { + pub num_processed: usize, + pub num_slashings: usize, +} + +#[derive(Debug)] +pub struct AttestationStats { + pub num_processed: usize, +} diff --git a/slasher/src/config.rs b/slasher/src/config.rs index 4438a2530..0f44d8d48 100644 --- a/slasher/src/config.rs +++ b/slasher/src/config.rs @@ -8,6 +8,7 @@ pub const DEFAULT_VALIDATOR_CHUNK_SIZE: usize = 256; pub const DEFAULT_HISTORY_LENGTH: usize = 4096; pub const DEFAULT_UPDATE_PERIOD: u64 = 12; pub const DEFAULT_MAX_DB_SIZE: usize = 256 * 1024; // 256 GiB +pub const DEFAULT_BROADCAST: bool = false; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { @@ -20,6 +21,8 @@ pub struct Config { pub update_period: u64, /// Maximum size of the LMDB database in megabytes. pub max_db_size_mbs: usize, + /// Whether to broadcast slashings found to the network. + pub broadcast: bool, } impl Config { @@ -31,6 +34,7 @@ impl Config { history_length: DEFAULT_HISTORY_LENGTH, update_period: DEFAULT_UPDATE_PERIOD, max_db_size_mbs: DEFAULT_MAX_DB_SIZE, + broadcast: DEFAULT_BROADCAST, } } diff --git a/slasher/src/lib.rs b/slasher/src/lib.rs index d173f26a0..e16003126 100644 --- a/slasher/src/lib.rs +++ b/slasher/src/lib.rs @@ -3,13 +3,13 @@ mod array; mod attestation_queue; mod attester_record; +mod batch_stats; mod block_queue; pub mod config; mod database; mod error; -mod metrics; +pub mod metrics; mod slasher; -mod slasher_server; pub mod test_utils; mod utils; @@ -20,7 +20,6 @@ pub use block_queue::BlockQueue; pub use config::Config; pub use database::SlasherDB; pub use error::Error; -pub use slasher_server::SlasherServer; use types::{AttesterSlashing, EthSpec, IndexedAttestation, ProposerSlashing}; diff --git a/slasher/src/slasher.rs b/slasher/src/slasher.rs index 1589dc2bf..fab807fcf 100644 --- a/slasher/src/slasher.rs +++ b/slasher/src/slasher.rs @@ -1,3 +1,4 @@ +use crate::batch_stats::{AttestationStats, BatchStats, BlockStats}; use crate::metrics::{ self, SLASHER_NUM_ATTESTATIONS_DEFERRED, SLASHER_NUM_ATTESTATIONS_DROPPED, SLASHER_NUM_ATTESTATIONS_VALID, SLASHER_NUM_BLOCKS_PROCESSED, @@ -18,12 +19,12 @@ use types::{ #[derive(Debug)] pub struct Slasher { db: SlasherDB, - pub(crate) attestation_queue: AttestationQueue, - pub(crate) block_queue: BlockQueue, + attestation_queue: AttestationQueue, + block_queue: BlockQueue, attester_slashings: Mutex>>, proposer_slashings: Mutex>, config: Arc, - pub(crate) log: Logger, + log: Logger, } impl Slasher { @@ -60,6 +61,10 @@ impl Slasher { &self.config } + pub fn log(&self) -> &Logger { + &self.log + } + /// Accept an attestation from the network and queue it for processing. pub fn accept_attestation(&self, attestation: IndexedAttestation) { self.attestation_queue.queue(attestation); @@ -71,17 +76,23 @@ impl Slasher { } /// Apply queued blocks and attestations to the on-disk database, and detect slashings! - pub fn process_queued(&self, current_epoch: Epoch) -> Result<(), Error> { + pub fn process_queued(&self, current_epoch: Epoch) -> Result { let mut txn = self.db.begin_rw_txn()?; - self.process_blocks(&mut txn)?; - self.process_attestations(current_epoch, &mut txn)?; + let block_stats = self.process_blocks(&mut txn)?; + let attestation_stats = self.process_attestations(current_epoch, &mut txn)?; txn.commit()?; - Ok(()) + Ok(BatchStats { + block_stats, + attestation_stats, + }) } /// Apply queued blocks to the on-disk database. - pub fn process_blocks(&self, txn: &mut RwTransaction<'_>) -> Result<(), Error> { + /// + /// Return the number of blocks + pub fn process_blocks(&self, txn: &mut RwTransaction<'_>) -> Result { let blocks = self.block_queue.dequeue(); + let num_processed = blocks.len(); let mut slashings = vec![]; metrics::set_gauge(&SLASHER_NUM_BLOCKS_PROCESSED, blocks.len() as i64); @@ -94,6 +105,7 @@ impl Slasher { } } + let num_slashings = slashings.len(); if !slashings.is_empty() { info!( self.log, @@ -103,7 +115,10 @@ impl Slasher { self.proposer_slashings.lock().extend(slashings); } - Ok(()) + Ok(BlockStats { + num_processed, + num_slashings, + }) } /// Apply queued attestations to the on-disk database. @@ -111,8 +126,9 @@ impl Slasher { &self, current_epoch: Epoch, txn: &mut RwTransaction<'_>, - ) -> Result<(), Error> { + ) -> Result { let snapshot = self.attestation_queue.dequeue(); + let num_processed = snapshot.len(); // Filter attestations for relevance. let (snapshot, deferred, num_dropped) = self.validate(snapshot, current_epoch); @@ -144,7 +160,7 @@ impl Slasher { for (subqueue_id, subqueue) in grouped_attestations.subqueues.into_iter().enumerate() { self.process_batch(txn, subqueue_id, subqueue.attestations, current_epoch)?; } - Ok(()) + Ok(AttestationStats { num_processed }) } /// Process a batch of attestations for a range of validator indices. diff --git a/slasher/src/slasher_server.rs b/slasher/src/slasher_server.rs deleted file mode 100644 index b542a8023..000000000 --- a/slasher/src/slasher_server.rs +++ /dev/null @@ -1,95 +0,0 @@ -use crate::metrics::{self, SLASHER_DATABASE_SIZE, SLASHER_RUN_TIME}; -use crate::Slasher; -use directory::size_of_dir; -use slog::{debug, error, info, trace}; -use slot_clock::SlotClock; -use std::sync::mpsc::{sync_channel, TrySendError}; -use std::sync::Arc; -use task_executor::TaskExecutor; -use tokio::stream::StreamExt; -use tokio::time::{interval_at, Duration, Instant}; -use types::EthSpec; - -#[derive(Debug)] -pub struct SlasherServer; - -impl SlasherServer { - pub fn run( - slasher: Arc>, - slot_clock: C, - executor: &TaskExecutor, - ) { - info!(slasher.log, "Starting slasher to detect misbehaviour"); - - // Buffer just a single message in the channel. If the receiver is still processing, we - // don't need to burden them with more work (we can wait). - let (sender, receiver) = sync_channel(1); - let log = slasher.log.clone(); - let update_period = slasher.config().update_period; - - executor.spawn( - async move { - // NOTE: could align each run to some fixed point in each slot, see: - // https://github.com/sigp/lighthouse/issues/1861 - let slot_clock = Arc::new(slot_clock); - let mut interval = interval_at(Instant::now(), Duration::from_secs(update_period)); - while interval.next().await.is_some() { - if let Some(current_slot) = slot_clock.clone().now() { - let current_epoch = current_slot.epoch(E::slots_per_epoch()); - if let Err(TrySendError::Disconnected(_)) = sender.try_send(current_epoch) { - break; - } - } else { - trace!(log, "Slasher has nothing to do: we are pre-genesis"); - } - } - }, - "slasher_server", - ); - - executor.spawn_blocking( - move || { - while let Ok(current_epoch) = receiver.recv() { - let t = Instant::now(); - let num_attestations = slasher.attestation_queue.len(); - let num_blocks = slasher.block_queue.len(); - - let batch_timer = metrics::start_timer(&SLASHER_RUN_TIME); - if let Err(e) = slasher.process_queued(current_epoch) { - error!( - slasher.log, - "Error during scheduled slasher processing"; - "epoch" => current_epoch, - "error" => format!("{:?}", e) - ); - } - drop(batch_timer); - - // Prune the database, even in the case where batch processing failed. - // If the LMDB database is full then pruning could help to free it up. - if let Err(e) = slasher.prune_database(current_epoch) { - error!( - slasher.log, - "Error during slasher database pruning"; - "epoch" => current_epoch, - "error" => format!("{:?}", e), - ); - continue; - } - debug!( - slasher.log, - "Completed slasher update"; - "epoch" => current_epoch, - "time_taken" => format!("{}ms", t.elapsed().as_millis()), - "num_attestations" => num_attestations, - "num_blocks" => num_blocks, - ); - - let database_size = size_of_dir(&slasher.config().database_path); - metrics::set_gauge(&SLASHER_DATABASE_SIZE, database_size as i64); - } - }, - "slasher_server_process_queued", - ); - } -} diff --git a/slasher/tests/attester_slashings.rs b/slasher/tests/attester_slashings.rs index a0a26a96d..96deb83b1 100644 --- a/slasher/tests/attester_slashings.rs +++ b/slasher/tests/attester_slashings.rs @@ -205,7 +205,7 @@ fn parallel_slasher_test( .into_par_iter() .try_for_each(|attestation| { slasher.accept_attestation(attestation.clone()); - slasher.process_queued(current_epoch) + slasher.process_queued(current_epoch).map(|_| ()) }) .expect("parallel processing shouldn't race"); diff --git a/slasher/tests/wrap_around.rs b/slasher/tests/wrap_around.rs index 7802a7390..6615d4a42 100644 --- a/slasher/tests/wrap_around.rs +++ b/slasher/tests/wrap_around.rs @@ -79,7 +79,7 @@ fn pruning_with_map_full() { 0, )); match slasher.process_queued(current_epoch) { - Ok(()) => break, + Ok(_) => break, Err(Error::DatabaseError(lmdb::Error::MapFull)) => { current_epoch += 1; }