From 5828ff120445952b3621efe1abade573dbfaa67c Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 23 Nov 2020 03:43:22 +0000 Subject: [PATCH] Implement slasher (#1567) This is an implementation of a slasher that lives inside the BN and can be enabled via `lighthouse bn --slasher`. Features included in this PR: - [x] Detection of attester slashing conditions (double votes, surrounds existing, surrounded by existing) - [x] Integration into Lighthouse's attestation verification flow - [x] Detection of proposer slashing conditions - [x] Extraction of attestations from blocks as they are verified - [x] Compression of chunks - [x] Configurable history length - [x] Pruning of old attestations and blocks - [x] More tests Future work: * Focus on a slice of history separate from the most recent N epochs (e.g. epochs `current - K` to `current - M`) * Run out-of-process * Ingest attestations from the chain without a resync Design notes are here https://hackmd.io/@sproul/HJSEklmPL --- Cargo.lock | 58 ++ Cargo.toml | 2 + beacon_node/Cargo.toml | 1 + beacon_node/beacon_chain/Cargo.toml | 1 + .../src/attestation_verification.rs | 286 ++++++-- beacon_node/beacon_chain/src/beacon_chain.rs | 86 ++- .../beacon_chain/src/block_verification.rs | 143 +++- beacon_node/beacon_chain/src/builder.rs | 10 + beacon_node/client/Cargo.toml | 1 + beacon_node/client/src/builder.rs | 39 ++ beacon_node/client/src/config.rs | 2 + beacon_node/network/src/service.rs | 4 +- beacon_node/src/cli.rs | 74 ++ beacon_node/src/config.rs | 37 + beacon_node/src/lib.rs | 12 + beacon_node/store/Cargo.toml | 1 + beacon_node/store/src/metrics.rs | 16 +- book/src/SUMMARY.md | 1 + book/src/slasher.md | 127 ++++ common/directory/src/lib.rs | 20 +- common/lighthouse_metrics/src/lib.rs | 8 +- consensus/types/src/beacon_block_header.rs | 4 +- consensus/types/src/proposer_slashing.rs | 4 +- consensus/types/src/signed_beacon_block.rs | 10 +- .../types/src/signed_beacon_block_header.rs | 49 +- crypto/bls/src/generic_signature.rs | 2 +- slasher/Cargo.toml | 38 ++ slasher/src/array.rs | 631 ++++++++++++++++++ slasher/src/attestation_queue.rs | 93 +++ slasher/src/attester_record.rs | 57 ++ slasher/src/block_queue.rs | 26 + slasher/src/config.rs | 111 +++ slasher/src/database.rs | 535 +++++++++++++++ slasher/src/error.rs | 83 +++ slasher/src/lib.rs | 66 ++ slasher/src/metrics.rs | 38 ++ slasher/src/slasher.rs | 299 +++++++++ slasher/src/slasher_server.rs | 95 +++ slasher/src/test_utils.rs | 115 ++++ slasher/src/utils.rs | 16 + slasher/tests/attester_slashings.rs | 215 ++++++ slasher/tests/proposer_slashings.rs | 61 ++ slasher/tests/random.rs | 233 +++++++ slasher/tests/wrap_around.rs | 39 ++ 44 files changed, 3662 insertions(+), 87 deletions(-) create mode 100644 book/src/slasher.md create mode 100644 slasher/Cargo.toml create mode 100644 slasher/src/array.rs create mode 100644 slasher/src/attestation_queue.rs create mode 100644 slasher/src/attester_record.rs create mode 100644 slasher/src/block_queue.rs create mode 100644 slasher/src/config.rs create mode 100644 slasher/src/database.rs create mode 100644 slasher/src/error.rs create mode 100644 slasher/src/lib.rs create mode 100644 slasher/src/metrics.rs create mode 100644 slasher/src/slasher.rs create mode 100644 slasher/src/slasher_server.rs create mode 100644 slasher/src/test_utils.rs create mode 100644 slasher/src/utils.rs create mode 100644 slasher/tests/attester_slashings.rs create mode 100644 slasher/tests/proposer_slashings.rs create mode 100644 slasher/tests/random.rs create mode 100644 slasher/tests/wrap_around.rs diff --git a/Cargo.lock b/Cargo.lock index 3746434d7..5154cd25f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -536,6 +536,7 @@ dependencies = [ "serde_derive", "serde_json", "serde_yaml", + "slasher", "slog", "sloggers", "slot_clock", @@ -576,6 +577,7 @@ dependencies = [ "node_test_rig", "rand 0.7.3", "serde", + "slasher", "slog", "slog-async", "slog-term", @@ -1046,6 +1048,7 @@ dependencies = [ "serde", "serde_derive", "serde_yaml", + "slasher", "slog", "slog-async", "sloggers", @@ -3770,6 +3773,28 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a" +[[package]] +name = "lmdb" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0908efb5d6496aa977d96f91413da2635a902e5e31dbef0bfb88986c248539" +dependencies = [ + "bitflags 1.2.1", + "libc", + "lmdb-sys", +] + +[[package]] +name = "lmdb-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5b392838cfe8858e86fac37cf97a0e8c55cc60ba0a18365cadc33092f128ce9" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "lock_api" version = "0.3.4" @@ -5886,6 +5911,38 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" +[[package]] +name = "slasher" +version = "0.1.0" +dependencies = [ + "bincode", + "byteorder", + "directory", + "eth2_ssz", + "eth2_ssz_derive", + "flate2", + "lazy_static", + "lighthouse_metrics", + "lmdb", + "lmdb-sys", + "maplit", + "parking_lot 0.11.0", + "rand 0.7.3", + "rayon", + "safe_arith", + "serde", + "serde_derive", + "slog", + "sloggers", + "slot_clock", + "task_executor", + "tempdir", + "tokio 0.2.23", + "tree_hash", + "tree_hash_derive", + "types", +] + [[package]] name = "slashing_protection" version = "0.1.0" @@ -6199,6 +6256,7 @@ version = "0.2.0" dependencies = [ "criterion", "db-key", + "directory", "eth2_ssz", "eth2_ssz_derive", "itertools 0.9.0", diff --git a/Cargo.toml b/Cargo.toml index c4e057cb4..2e30af450 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,8 @@ members = [ "remote_signer/backend", "remote_signer/client", + "slasher", + "testing/ef_tests", "testing/eth1_test_rig", "testing/node_test_rig", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index aec173de7..863081898 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -43,3 +43,4 @@ clap_utils = { path = "../common/clap_utils" } hyper = "0.13.8" lighthouse_version = { path = "../common/lighthouse_version" } hex = "0.4.2" +slasher = { path = "../slasher" } diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index c80df28cd..e9339e2a7 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -61,4 +61,5 @@ derivative = "2.1.1" itertools = "0.9.0" regex = "1.3.9" exit-future = "0.2.0" +slasher = { path = "../../slasher" } eth2 = { path = "../../common/eth2" } diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 1b3c97e42..b3dca216e 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -37,6 +37,7 @@ use crate::{ }; use bls::verify_signature_sets; use proto_array::Block as ProtoBlock; +use slog::debug; use slot_clock::SlotClock; use state_processing::{ common::get_indexed_attestation, @@ -297,6 +298,76 @@ impl SignatureVerifiedAttestation for VerifiedUnaggregat } } +/// Information about invalid attestations which might still be slashable despite being invalid. +pub enum AttestationSlashInfo { + /// The attestation is invalid, but its signature wasn't checked. + SignatureNotChecked(Attestation, TErr), + /// As for `SignatureNotChecked`, but we know the `IndexedAttestation`. + SignatureNotCheckedIndexed(IndexedAttestation, TErr), + /// The attestation's signature is invalid, so it will never be slashable. + SignatureInvalid(TErr), + /// The signature is valid but the attestation is invalid in some other way. + SignatureValid(IndexedAttestation, TErr), +} + +/// After processing an attestation normally, optionally process it further for the slasher. +/// +/// This maps an `AttestationSlashInfo` error back into a regular `Error`, performing signature +/// checks on attestations that failed verification for other reasons. +/// +/// No substantial extra work will be done if there is no slasher configured. +fn process_slash_info( + slash_info: AttestationSlashInfo, + chain: &BeaconChain, +) -> Error { + use AttestationSlashInfo::*; + + if let Some(slasher) = chain.slasher.as_ref() { + let (indexed_attestation, check_signature, err) = match slash_info { + SignatureNotChecked(attestation, err) => { + match obtain_indexed_attestation_and_committees_per_slot(chain, &attestation) { + Ok((indexed, _)) => (indexed, true, err), + Err(e) => { + debug!( + chain.log, + "Unable to obtain indexed form of attestation for slasher"; + "attestation_root" => format!("{:?}", attestation.tree_hash_root()), + "error" => format!("{:?}", e) + ); + return err; + } + } + } + SignatureNotCheckedIndexed(indexed, err) => (indexed, true, err), + SignatureInvalid(e) => return e, + SignatureValid(indexed, err) => (indexed, false, err), + }; + + if check_signature { + if let Err(e) = verify_attestation_signature(chain, &indexed_attestation) { + debug!( + chain.log, + "Signature verification for slasher failed"; + "error" => format!("{:?}", e), + ); + return err; + } + } + + // Supply to slasher. + slasher.accept_attestation(indexed_attestation); + + err + } else { + match slash_info { + SignatureNotChecked(_, e) + | SignatureNotCheckedIndexed(_, e) + | SignatureInvalid(e) + | SignatureValid(_, e) => e, + } + } +} + impl VerifiedAggregatedAttestation { /// Returns `Ok(Self)` if the `signed_aggregate` is valid to be (re)published on the gossip /// network. @@ -304,6 +375,21 @@ impl VerifiedAggregatedAttestation { signed_aggregate: SignedAggregateAndProof, chain: &BeaconChain, ) -> Result { + Self::verify_slashable(signed_aggregate, chain) + .map(|verified_aggregate| { + if let Some(slasher) = chain.slasher.as_ref() { + slasher.accept_attestation(verified_aggregate.indexed_attestation.clone()); + } + verified_aggregate + }) + .map_err(|slash_info| process_slash_info(slash_info, chain)) + } + + /// Run the checks that happen before an indexed attestation is constructed. + fn verify_early_checks( + signed_aggregate: &SignedAggregateAndProof, + chain: &BeaconChain, + ) -> Result { let attestation = &signed_aggregate.message.aggregate; // Ensure attestation is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (within a @@ -364,37 +450,20 @@ impl VerifiedAggregatedAttestation { // Ensure that the attestation has participants. if attestation.aggregation_bits.is_zero() { - return Err(Error::EmptyAggregationBitfield); + Err(Error::EmptyAggregationBitfield) + } else { + Ok(attestation_root) } + } - let indexed_attestation = - map_attestation_committee(chain, attestation, |(committee, _)| { - // Note: this clones the signature which is known to be a relatively slow operation. - // - // Future optimizations should remove this clone. - let selection_proof = - SelectionProof::from(signed_aggregate.message.selection_proof.clone()); - - if !selection_proof - .is_aggregator(committee.committee.len(), &chain.spec) - .map_err(|e| Error::BeaconChainError(e.into()))? - { - return Err(Error::InvalidSelectionProof { aggregator_index }); - } - - // Ensure the aggregator is a member of the committee for which it is aggregating. - if !committee.committee.contains(&(aggregator_index as usize)) { - return Err(Error::AggregatorNotInCommittee { aggregator_index }); - } - - get_indexed_attestation(committee.committee, &attestation) - .map_err(|e| BeaconChainError::from(e).into()) - })?; - - // Ensure that all signatures are valid. - if !verify_signed_aggregate_signatures(chain, &signed_aggregate, &indexed_attestation)? { - return Err(Error::InvalidSignature); - } + /// Run the checks that happen after the indexed attestation and signature have been checked. + fn verify_late_checks( + signed_aggregate: &SignedAggregateAndProof, + attestation_root: Hash256, + chain: &BeaconChain, + ) -> Result<(), Error> { + let attestation = &signed_aggregate.message.aggregate; + let aggregator_index = signed_aggregate.message.aggregator_index; // Observe the valid attestation so we do not re-process it. // @@ -425,6 +494,68 @@ impl VerifiedAggregatedAttestation { }); } + Ok(()) + } + + /// Verify the attestation, producing extra information about whether it might be slashable. + pub fn verify_slashable( + signed_aggregate: SignedAggregateAndProof, + chain: &BeaconChain, + ) -> Result> { + use AttestationSlashInfo::*; + + let attestation = &signed_aggregate.message.aggregate; + let aggregator_index = signed_aggregate.message.aggregator_index; + let attestation_root = match Self::verify_early_checks(&signed_aggregate, chain) { + Ok(root) => root, + Err(e) => return Err(SignatureNotChecked(signed_aggregate.message.aggregate, e)), + }; + + let indexed_attestation = + match map_attestation_committee(chain, attestation, |(committee, _)| { + // Note: this clones the signature which is known to be a relatively slow operation. + // + // Future optimizations should remove this clone. + let selection_proof = + SelectionProof::from(signed_aggregate.message.selection_proof.clone()); + + if !selection_proof + .is_aggregator(committee.committee.len(), &chain.spec) + .map_err(|e| Error::BeaconChainError(e.into()))? + { + return Err(Error::InvalidSelectionProof { aggregator_index }); + } + + // Ensure the aggregator is a member of the committee for which it is aggregating. + if !committee.committee.contains(&(aggregator_index as usize)) { + return Err(Error::AggregatorNotInCommittee { aggregator_index }); + } + + get_indexed_attestation(committee.committee, attestation) + .map_err(|e| BeaconChainError::from(e).into()) + }) { + Ok(indexed_attestation) => indexed_attestation, + Err(e) => return Err(SignatureNotChecked(signed_aggregate.message.aggregate, e)), + }; + + // Ensure that all signatures are valid. + if let Err(e) = + verify_signed_aggregate_signatures(chain, &signed_aggregate, &indexed_attestation) + .and_then(|is_valid| { + if !is_valid { + Err(Error::InvalidSignature) + } else { + Ok(()) + } + }) + { + return Err(SignatureInvalid(e)); + } + + if let Err(e) = Self::verify_late_checks(&signed_aggregate, attestation_root, chain) { + return Err(SignatureValid(indexed_attestation, e)); + } + Ok(VerifiedAggregatedAttestation { signed_aggregate, indexed_attestation, @@ -448,16 +579,11 @@ impl VerifiedAggregatedAttestation { } impl VerifiedUnaggregatedAttestation { - /// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip - /// network. - /// - /// `subnet_id` is the subnet from which we received this attestation. This function will - /// verify that it was received on the correct subnet. - pub fn verify( - attestation: Attestation, - subnet_id: Option, + /// Run the checks that happen before an indexed attestation is constructed. + pub fn verify_early_checks( + attestation: &Attestation, chain: &BeaconChain, - ) -> Result { + ) -> Result<(), Error> { let attestation_epoch = attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()); // Check the attestation's epoch matches its target. @@ -491,9 +617,17 @@ impl VerifiedUnaggregatedAttestation { // Check the attestation target root is consistent with the head root. verify_attestation_target_root::(&head_block, &attestation)?; - let (indexed_attestation, committees_per_slot) = - obtain_indexed_attestation_and_committees_per_slot(chain, &attestation)?; + Ok(()) + } + /// Run the checks that apply to the indexed attestation before the signature is checked. + pub fn verify_middle_checks( + attestation: &Attestation, + indexed_attestation: &IndexedAttestation, + committees_per_slot: u64, + subnet_id: Option, + chain: &BeaconChain, + ) -> Result<(u64, SubnetId), Error> { let expected_subnet_id = SubnetId::compute_subnet_for_attestation_data::( &indexed_attestation.data, committees_per_slot, @@ -532,9 +666,15 @@ impl VerifiedUnaggregatedAttestation { }); } - // The aggregate signature of the attestation is valid. - verify_attestation_signature(chain, &indexed_attestation)?; + Ok((validator_index, expected_subnet_id)) + } + /// Run the checks that apply after the signature has been checked. + fn verify_late_checks( + attestation: &Attestation, + validator_index: u64, + chain: &BeaconChain, + ) -> Result<(), Error> { // Now that the attestation has been fully verified, store that we have received a valid // attestation from this validator. // @@ -552,6 +692,68 @@ impl VerifiedUnaggregatedAttestation { epoch: attestation.data.target.epoch, }); } + Ok(()) + } + + /// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip + /// network. + /// + /// `subnet_id` is the subnet from which we received this attestation. This function will + /// verify that it was received on the correct subnet. + pub fn verify( + attestation: Attestation, + subnet_id: Option, + chain: &BeaconChain, + ) -> Result { + Self::verify_slashable(attestation, subnet_id, chain) + .map(|verified_unaggregated| { + if let Some(slasher) = chain.slasher.as_ref() { + slasher.accept_attestation(verified_unaggregated.indexed_attestation.clone()); + } + verified_unaggregated + }) + .map_err(|slash_info| process_slash_info(slash_info, chain)) + } + + /// Verify the attestation, producing extra information about whether it might be slashable. + pub fn verify_slashable( + attestation: Attestation, + subnet_id: Option, + chain: &BeaconChain, + ) -> Result> { + use AttestationSlashInfo::*; + + if let Err(e) = Self::verify_early_checks(&attestation, chain) { + return Err(SignatureNotChecked(attestation, e)); + } + + let (indexed_attestation, committees_per_slot) = + match obtain_indexed_attestation_and_committees_per_slot(chain, &attestation) { + Ok(x) => x, + Err(e) => { + return Err(SignatureNotChecked(attestation, e)); + } + }; + + let (validator_index, expected_subnet_id) = match Self::verify_middle_checks( + &attestation, + &indexed_attestation, + committees_per_slot, + subnet_id, + chain, + ) { + Ok(t) => t, + Err(e) => return Err(SignatureNotCheckedIndexed(indexed_attestation, e)), + }; + + // The aggregate signature of the attestation is valid. + if let Err(e) = verify_attestation_signature(chain, &indexed_attestation) { + return Err(SignatureInvalid(e)); + } + + if let Err(e) = Self::verify_late_checks(&attestation, validator_index, chain) { + return Err(SignatureValid(indexed_attestation, e)); + } Ok(Self { attestation, diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e1f86b74a..b1eec7c10 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -32,12 +32,13 @@ use futures::channel::mpsc::Sender; use itertools::process_results; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::{Mutex, RwLock}; +use slasher::Slasher; use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use state_processing::{ common::get_indexed_attestation, per_block_processing, per_block_processing::errors::AttestationValidationError, per_slot_processing, - BlockSignatureStrategy, SigVerifiedOp, + BlockSignatureStrategy, SigVerifiedOp, VerifyOperation, }; use std::borrow::Cow; use std::cmp::Ordering; @@ -232,6 +233,8 @@ pub struct BeaconChain { pub(crate) log: Logger, /// Arbitrary bytes included in the blocks. pub(crate) graffiti: Graffiti, + /// Optional slasher. + pub(crate) slasher: Option>>, } type BeaconBlockAndState = (BeaconBlock, BeaconState); @@ -518,10 +521,13 @@ impl BeaconChain { } /// Apply a function to the canonical head without cloning it. - pub fn with_head( + pub fn with_head( &self, - f: impl FnOnce(&BeaconSnapshot) -> Result, - ) -> Result { + f: impl FnOnce(&BeaconSnapshot) -> Result, + ) -> Result + where + E: From, + { let head_lock = self .canonical_head .try_read_for(HEAD_LOCK_TIMEOUT) @@ -1080,6 +1086,63 @@ impl BeaconChain { Ok(signed_aggregate) } + /// Move slashings collected by the slasher into the op pool for block inclusion. + fn ingest_slashings_to_op_pool(&self, state: &BeaconState) { + if let Some(slasher) = self.slasher.as_ref() { + let attester_slashings = slasher.get_attester_slashings(); + let proposer_slashings = slasher.get_proposer_slashings(); + + if !attester_slashings.is_empty() || !proposer_slashings.is_empty() { + debug!( + self.log, + "Ingesting slashings"; + "num_attester_slashings" => attester_slashings.len(), + "num_proposer_slashings" => proposer_slashings.len(), + ); + } + + for slashing in attester_slashings { + let verified_slashing = match slashing.clone().validate(state, &self.spec) { + Ok(verified) => verified, + Err(e) => { + error!( + self.log, + "Attester slashing from slasher failed verification"; + "error" => format!("{:?}", e), + "slashing" => format!("{:?}", slashing), + ); + continue; + } + }; + + if let Err(e) = self.import_attester_slashing(verified_slashing) { + error!( + self.log, + "Attester slashing from slasher is invalid"; + "error" => format!("{:?}", e), + "slashing" => format!("{:?}", slashing), + ); + } + } + + for slashing in proposer_slashings { + let verified_slashing = match slashing.clone().validate(state, &self.spec) { + Ok(verified) => verified, + Err(e) => { + error!( + self.log, + "Proposer slashing from slasher failed verification"; + "error" => format!("{:?}", e), + "slashing" => format!("{:?}", slashing), + ); + continue; + } + }; + self.import_proposer_slashing(verified_slashing); + } + } + } + /// Check that the shuffling at `block_root` is equal to one of the shufflings of `state`. /// /// The `target_epoch` argument determines which shuffling to check compatibility with, it @@ -1525,6 +1588,18 @@ impl BeaconChain { metrics::stop_timer(attestation_observation_timer); + // If a slasher is configured, provide the attestations from the block. + if let Some(slasher) = self.slasher.as_ref() { + for attestation in &signed_block.message.body.attestations { + let committee = + state.get_beacon_committee(attestation.data.slot, attestation.data.index)?; + let indexed_attestation = + get_indexed_attestation(&committee.committee, attestation) + .map_err(|e| BlockError::BeaconChainError(e.into()))?; + slasher.accept_attestation(indexed_attestation); + } + } + // If there are new validators in this block, update our pubkey cache. // // We perform this _before_ adding the block to fork choice because the pubkey cache is @@ -1735,6 +1810,7 @@ impl BeaconChain { state.latest_block_header.canonical_root() }; + self.ingest_slashings_to_op_pool(&state); let (proposer_slashings, attester_slashings) = self.op_pool.get_slashings(&state, &self.spec); @@ -1951,6 +2027,8 @@ impl BeaconChain { { self.persist_head_and_fork_choice()?; self.op_pool.prune_attestations(self.epoch()?); + self.ingest_slashings_to_op_pool(&new_head.beacon_state); + self.persist_op_pool()?; } let update_head_timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES); diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 47a96c680..d3a5af218 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -67,7 +67,7 @@ use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp use tree_hash::TreeHash; use types::{ BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256, - PublicKey, RelativeEpoch, SignedBeaconBlock, Slot, + PublicKey, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; /// Maximum block slot number. Block with slots bigger than this constant will NOT be processed. @@ -268,6 +268,27 @@ impl From for BlockError { } } +/// Information about invalid blocks which might still be slashable despite being invalid. +pub enum BlockSlashInfo { + /// The block is invalid, but its proposer signature wasn't checked. + SignatureNotChecked(SignedBeaconBlockHeader, TErr), + /// The block's proposer signature is invalid, so it will never be slashable. + SignatureInvalid(TErr), + /// The signature is valid but the attestation is invalid in some other way. + SignatureValid(SignedBeaconBlockHeader, TErr), +} + +impl BlockSlashInfo> { + pub fn from_early_error(header: SignedBeaconBlockHeader, e: BlockError) -> Self { + match e { + BlockError::ProposalSignatureInvalid => BlockSlashInfo::SignatureInvalid(e), + // `InvalidSignature` could indicate any signature in the block, so we want + // to recheck the proposer signature alone. + _ => BlockSlashInfo::SignatureNotChecked(header, e), + } + } +} + /// Verify all signatures (except deposit signatures) on all blocks in the `chain_segment`. If all /// signatures are valid, the `chain_segment` is mapped to a `Vec` that can /// later be transformed into a `FullyVerifiedBlock` without re-checking the signatures. If any @@ -369,11 +390,51 @@ pub struct FullyVerifiedBlock<'a, T: BeaconChainTypes> { /// Implemented on types that can be converted into a `FullyVerifiedBlock`. /// /// Used to allow functions to accept blocks at various stages of verification. -pub trait IntoFullyVerifiedBlock { +pub trait IntoFullyVerifiedBlock: Sized { fn into_fully_verified_block( self, chain: &BeaconChain, - ) -> Result, BlockError>; + ) -> Result, BlockError> { + self.into_fully_verified_block_slashable(chain) + .map(|fully_verified| { + // Supply valid block to slasher. + if let Some(slasher) = chain.slasher.as_ref() { + slasher.accept_block_header(fully_verified.block.signed_block_header()); + } + fully_verified + }) + .map_err(|slash_info| { + // Process invalid blocks to see if they are suitable for the slasher. + if let Some(slasher) = chain.slasher.as_ref() { + let (verified_header, error) = match slash_info { + BlockSlashInfo::SignatureNotChecked(header, e) => { + if verify_header_signature(chain, &header).is_ok() { + (header, e) + } else { + return e; + } + } + BlockSlashInfo::SignatureInvalid(e) => return e, + BlockSlashInfo::SignatureValid(header, e) => (header, e), + }; + + slasher.accept_block_header(verified_header); + error + } else { + match slash_info { + BlockSlashInfo::SignatureNotChecked(_, e) + | BlockSlashInfo::SignatureInvalid(e) + | BlockSlashInfo::SignatureValid(_, e) => e, + } + } + }) + } + + /// Convert the block to fully-verified form while producing data to aid checking slashability. + fn into_fully_verified_block_slashable( + self, + chain: &BeaconChain, + ) -> Result, BlockSlashInfo>>; fn block(&self) -> &SignedBeaconBlock; } @@ -506,12 +567,13 @@ impl GossipVerifiedBlock { impl IntoFullyVerifiedBlock for GossipVerifiedBlock { /// Completes verification of the wrapped `block`. - fn into_fully_verified_block( + fn into_fully_verified_block_slashable( self, chain: &BeaconChain, - ) -> Result, BlockError> { - let fully_verified = SignatureVerifiedBlock::from_gossip_verified_block(self, chain)?; - fully_verified.into_fully_verified_block(chain) + ) -> Result, BlockSlashInfo>> { + let fully_verified = + SignatureVerifiedBlock::from_gossip_verified_block_check_slashable(self, chain)?; + fully_verified.into_fully_verified_block_slashable(chain) } fn block(&self) -> &SignedBeaconBlock { @@ -558,6 +620,15 @@ impl SignatureVerifiedBlock { } } + /// As for `new` above but producing `BlockSlashInfo`. + pub fn check_slashable( + block: SignedBeaconBlock, + chain: &BeaconChain, + ) -> Result>> { + let header = block.signed_block_header(); + Self::new(block, chain).map_err(|e| BlockSlashInfo::from_early_error(header, e)) + } + /// Finishes signature verification on the provided `GossipVerifedBlock`. Does not re-verify /// the proposer signature. pub fn from_gossip_verified_block( @@ -589,18 +660,30 @@ impl SignatureVerifiedBlock { Err(BlockError::InvalidSignature) } } + + /// Same as `from_gossip_verified_block` but producing slashing-relevant data as well. + pub fn from_gossip_verified_block_check_slashable( + from: GossipVerifiedBlock, + chain: &BeaconChain, + ) -> Result>> { + let header = from.block.signed_block_header(); + Self::from_gossip_verified_block(from, chain) + .map_err(|e| BlockSlashInfo::from_early_error(header, e)) + } } impl IntoFullyVerifiedBlock for SignatureVerifiedBlock { /// Completes verification of the wrapped `block`. - fn into_fully_verified_block( + fn into_fully_verified_block_slashable( self, chain: &BeaconChain, - ) -> Result, BlockError> { + ) -> Result, BlockSlashInfo>> { + let header = self.block.signed_block_header(); let (parent, block) = if let Some(parent) = self.parent { (parent, self.block) } else { - load_parent(self.block, chain)? + load_parent(self.block, chain) + .map_err(|e| BlockSlashInfo::SignatureValid(header.clone(), e))? }; FullyVerifiedBlock::from_signature_verified_components( @@ -609,6 +692,7 @@ impl IntoFullyVerifiedBlock for SignatureVerifiedBlock &SignedBeaconBlock { @@ -619,11 +703,12 @@ impl IntoFullyVerifiedBlock for SignatureVerifiedBlock IntoFullyVerifiedBlock for SignedBeaconBlock { /// Verifies the `SignedBeaconBlock` by first transforming it into a `SignatureVerifiedBlock` /// and then using that implementation of `IntoFullyVerifiedBlock` to complete verification. - fn into_fully_verified_block( + fn into_fully_verified_block_slashable( self, chain: &BeaconChain, - ) -> Result, BlockError> { - SignatureVerifiedBlock::new(self, chain)?.into_fully_verified_block(chain) + ) -> Result, BlockSlashInfo>> { + SignatureVerifiedBlock::check_slashable(self, chain)? + .into_fully_verified_block_slashable(chain) } fn block(&self) -> &SignedBeaconBlock { @@ -1125,6 +1210,38 @@ fn get_signature_verifier<'a, E: EthSpec>( ) } +/// Verify that `header` was signed with a valid signature from its proposer. +/// +/// Return `Ok(())` if the signature is valid, and an `Err` otherwise. +fn verify_header_signature( + chain: &BeaconChain, + header: &SignedBeaconBlockHeader, +) -> Result<(), BlockError> { + let proposer_pubkey = get_validator_pubkey_cache(chain)? + .get(header.message.proposer_index as usize) + .cloned() + .ok_or_else(|| BlockError::UnknownValidator(header.message.proposer_index))?; + let (fork, genesis_validators_root) = chain + .with_head(|head| { + Ok(( + head.beacon_state.fork, + head.beacon_state.genesis_validators_root, + )) + }) + .map_err(|e: BlockError| e)?; + + if header.verify_signature::( + &proposer_pubkey, + &fork, + genesis_validators_root, + &chain.spec, + ) { + Ok(()) + } else { + Err(BlockError::ProposalSignatureInvalid) + } +} + fn expose_participation_metrics(summaries: &[EpochProcessingSummary]) { if !cfg!(feature = "participation_metrics") { return; diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 8d1fec9e7..10e5cd0fc 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -21,6 +21,7 @@ use fork_choice::ForkChoice; use futures::channel::mpsc::Sender; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::RwLock; +use slasher::Slasher; use slog::{crit, info, Logger}; use slot_clock::{SlotClock, TestingSlotClock}; use std::marker::PhantomData; @@ -99,6 +100,7 @@ pub struct BeaconChainBuilder { disabled_forks: Vec, log: Option, graffiti: Graffiti, + slasher: Option>>, } impl @@ -139,6 +141,7 @@ where chain_config: ChainConfig::default(), log: None, graffiti: Graffiti::default(), + slasher: None, } } @@ -174,6 +177,12 @@ where self } + /// Sets the slasher. + pub fn slasher(mut self, slasher: Arc>) -> Self { + self.slasher = Some(slasher); + self + } + /// Sets the logger. /// /// Should generally be called early in the build chain. @@ -571,6 +580,7 @@ where .ok_or_else(|| "Cannot build without a shutdown sender.".to_string())?, log: log.clone(), graffiti: self.graffiti, + slasher: self.slasher.clone(), }; let head = beacon_chain diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 8114761aa..e9a3469e5 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -44,3 +44,4 @@ bus = "2.2.3" directory = {path = "../../common/directory"} http_api = { path = "../http_api" } http_metrics = { path = "../http_metrics" } +slasher = { path = "../../slasher" } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index e4d856159..723b099f6 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -16,6 +16,7 @@ use eth2_libp2p::NetworkGlobals; use genesis::{interop_genesis_state, Eth1GenesisService}; use network::{NetworkConfig, NetworkMessage, NetworkService}; use parking_lot::Mutex; +use slasher::{Slasher, SlasherServer}; use slog::{debug, info, warn}; use ssz::Decode; use std::net::SocketAddr; @@ -64,6 +65,7 @@ pub struct ClientBuilder { http_api_config: http_api::Config, http_metrics_config: http_metrics::Config, websocket_listen_addr: Option, + slasher: Option>>, eth_spec_instance: T::EthSpec, } @@ -97,6 +99,7 @@ where http_api_config: <_>::default(), http_metrics_config: <_>::default(), websocket_listen_addr: None, + slasher: None, eth_spec_instance, } } @@ -113,6 +116,11 @@ where self } + pub fn slasher(mut self, slasher: Arc>) -> Self { + self.slasher = Some(slasher); + self + } + /// Initializes the `BeaconChainBuilder`. The `build_beacon_chain` method will need to be /// called later in order to actually instantiate the `BeaconChain`. pub async fn beacon_chain_builder( @@ -146,6 +154,12 @@ where .disabled_forks(disabled_forks) .graffiti(graffiti); + let builder = if let Some(slasher) = self.slasher.clone() { + builder.slasher(slasher) + } else { + builder + }; + let chain_exists = builder .store_contains_beacon_chain() .unwrap_or_else(|_| false); @@ -343,6 +357,27 @@ where self } + /// Immediately start the slasher service. + /// + /// Error if no slasher is configured. + pub fn start_slasher_server(&self) -> Result<(), String> { + let context = self + .runtime_context + .as_ref() + .ok_or_else(|| "slasher requires a runtime_context")? + .service_context("slasher_server_ctxt".into()); + let slasher = self + .slasher + .clone() + .ok_or_else(|| "slasher server requires a slasher")?; + let slot_clock = self + .slot_clock + .clone() + .ok_or_else(|| "slasher server requires a slot clock")?; + SlasherServer::run(slasher, slot_clock, &context.executor); + Ok(()) + } + /// Immediately starts the service that periodically logs information each slot. pub fn notifier(self) -> Result { let context = self @@ -442,6 +477,10 @@ where None }; + if self.slasher.is_some() { + self.start_slasher_server()?; + } + Ok(Client { beacon_chain: self.beacon_chain, network_globals: self.network_globals, diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index e2be8fd80..3649a3da3 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -63,6 +63,7 @@ pub struct Config { pub eth1: eth1::Config, pub http_api: http_api::Config, pub http_metrics: http_metrics::Config, + pub slasher: Option, } impl Default for Config { @@ -84,6 +85,7 @@ impl Default for Config { graffiti: Graffiti::default(), http_api: <_>::default(), http_metrics: <_>::default(), + slasher: None, } } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index d517d4f10..fdb96b77a 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -5,7 +5,7 @@ use crate::{ NetworkConfig, }; use crate::{error, metrics}; -use beacon_chain::{BeaconChain, BeaconChainTypes}; +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2_libp2p::{ rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId}, Gossipsub, Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, Request, Response, @@ -281,7 +281,7 @@ fn spawn_service( _ = service.gossipsub_parameter_update.next() => { if let Ok(slot) = service.beacon_chain.slot() { if let Some(active_validators) = service.beacon_chain.with_head(|head| { - Ok( + Ok::<_, BeaconChainError>( head .beacon_state .get_cached_active_validator_indices(RelativeEpoch::Current) diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 6426187b8..3a1361184 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -359,6 +359,80 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .value_name("NUM_SLOTS") .takes_value(true) ) + /* + * Slasher. + */ + .arg( + Arg::with_name("slasher") + .long("slasher") + .help( + "Run a slasher alongside the beacon node. It is currently only recommended for \ + expert users because of the immaturity of the slasher UX and the extra \ + resources required." + ) + .takes_value(false) + ) + .arg( + Arg::with_name("slasher-dir") + .long("slasher-dir") + .help( + "Set the slasher's database directory." + ) + .value_name("PATH") + .takes_value(true) + .requires("slasher") + ) + .arg( + Arg::with_name("slasher-update-period") + .long("slasher-update-period") + .help( + "Configure how often the slasher runs batch processing." + ) + .value_name("SECONDS") + .requires("slasher") + .takes_value(true) + ) + .arg( + Arg::with_name("slasher-history-length") + .long("slasher-history-length") + .help( + "Configure how many epochs of history the slasher keeps. Immutable after \ + initialization." + ) + .value_name("EPOCHS") + .requires("slasher") + .takes_value(true) + ) + .arg( + Arg::with_name("slasher-max-db-size") + .long("slasher-max-db-size") + .help( + "Maximum size of the LMDB database used by the slasher." + ) + .value_name("GIGABYTES") + .requires("slasher") + .takes_value(true) + ) + .arg( + Arg::with_name("slasher-chunk-size") + .long("slasher-chunk-size") + .help( + "Number of epochs per validator per chunk stored on disk." + ) + .value_name("EPOCHS") + .requires("slasher") + .takes_value(true) + ) + .arg( + Arg::with_name("slasher-validator-chunk-size") + .long("slasher-validator-chunk-size") + .help( + "Number of validators per chunk stored on disk." + ) + .value_name("NUM_VALIDATORS") + .requires("slasher") + .takes_value(true) + ) .arg( Arg::with_name("wss-checkpoint") .long("wss-checkpoint") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index a740371f4..4f3f33fe0 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -351,6 +351,43 @@ pub fn get_config( }; } + if cli_args.is_present("slasher") { + let slasher_dir = if let Some(slasher_dir) = cli_args.value_of("slasher-dir") { + PathBuf::from(slasher_dir) + } else { + client_config.data_dir.join("slasher_db") + }; + + let mut slasher_config = slasher::Config::new(slasher_dir); + + if let Some(update_period) = clap_utils::parse_optional(cli_args, "slasher-update-period")? + { + slasher_config.update_period = update_period; + } + + if let Some(history_length) = + clap_utils::parse_optional(cli_args, "slasher-history-length")? + { + slasher_config.history_length = history_length; + } + + if let Some(max_db_size) = clap_utils::parse_optional(cli_args, "slasher-max-db-size")? { + slasher_config.max_db_size_gbs = max_db_size; + } + + if let Some(chunk_size) = clap_utils::parse_optional(cli_args, "slasher-chunk-size")? { + slasher_config.chunk_size = chunk_size; + } + + if let Some(validator_chunk_size) = + clap_utils::parse_optional(cli_args, "slasher-validator-chunk-size")? + { + slasher_config.validator_chunk_size = validator_chunk_size; + } + + client_config.slasher = Some(slasher_config); + } + Ok(client_config) } diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index e3e4c89e5..a468871a5 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -17,8 +17,10 @@ use beacon_chain::{ }; use clap::ArgMatches; use environment::RuntimeContext; +use slasher::Slasher; use slog::{info, warn}; use std::ops::{Deref, DerefMut}; +use std::sync::Arc; use types::EthSpec; /// A type-alias to the tighten the definition of a production-intended `Client`. @@ -82,6 +84,16 @@ impl ProductionBeaconNode { .http_api_config(client_config.http_api.clone()) .disk_store(&db_path, &freezer_db_path_res?, store_config)?; + let builder = if let Some(slasher_config) = client_config.slasher.clone() { + let slasher = Arc::new( + Slasher::open(slasher_config, log.new(slog::o!("service" => "slasher"))) + .map_err(|e| format!("Slasher open error: {:?}", e))?, + ); + builder.slasher(slasher) + } else { + builder + }; + let builder = builder .beacon_chain_builder(client_genesis, client_config_1) .await?; diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index a0fa4c24e..81c949495 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -30,3 +30,4 @@ lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } lru = "0.6.0" sloggers = "1.0.1" +directory = { path = "../../common/directory" } diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index 826712a72..72c5e6196 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -1,6 +1,6 @@ pub use lighthouse_metrics::{set_gauge, try_create_int_gauge, *}; -use std::fs; +use directory::size_of_dir; use std::path::Path; lazy_static! { @@ -134,17 +134,3 @@ pub fn scrape_for_metrics(db_path: &Path, freezer_db_path: &Path) { let freezer_db_size = size_of_dir(freezer_db_path); set_gauge(&FREEZER_DB_SIZE, freezer_db_size as i64); } - -fn size_of_dir(path: &Path) -> u64 { - if let Ok(iter) = fs::read_dir(path) { - iter.filter_map(std::result::Result::ok) - .map(size_of_dir_entry) - .sum() - } else { - 0 - } -} - -fn size_of_dir_entry(dir: fs::DirEntry) -> u64 { - dir.metadata().map(|m| m.len()).unwrap_or(0) -} diff --git a/book/src/SUMMARY.md b/book/src/SUMMARY.md index e934ac987..7801b032a 100644 --- a/book/src/SUMMARY.md +++ b/book/src/SUMMARY.md @@ -30,6 +30,7 @@ * [Database Configuration](./advanced_database.md) * [Local Testnets](./local-testnets.md) * [Advanced Networking](./advanced_networking.md) + * [Running a Slasher](./slasher.md) * [Contributing](./contributing.md) * [Development Environment](./setup.md) * [FAQs](./faq.md) diff --git a/book/src/slasher.md b/book/src/slasher.md new file mode 100644 index 000000000..0be9a65f6 --- /dev/null +++ b/book/src/slasher.md @@ -0,0 +1,127 @@ +# Running a Slasher + +Lighthouse includes a slasher for identifying slashable offences comitted by other validators and +including proof of those offences in blocks. + +Running a slasher is a good way to contribute to the health of the network, and doing so can earn +extra income for your validators. However it is currently only recommended for expert users because +of the immaturity of the slasher UX and the extra resources required. + +## Minimum System Requirements + +* Quad-core CPU +* 16 GB RAM +* 256 GB solid state storage (in addition to space for the beacon node DB) + +## How to Run + +The slasher runs inside the same process as the beacon node, when enabled via the `--slasher` flag: + +``` +lighthouse bn --slasher --debug-level debug +``` + +The slasher hooks into Lighthouse's block and attestation processing, and pushes messages into an +in-memory queue for regular processing. It will increase the CPU usage of the beacon node because it +verifies the signatures of otherwise invalid messages. When a slasher batch update runs, the +messages are filtered for relevancy, and all relevant messages are checked for slashings and written +to the slasher database. + +You **should** run with debug logs, so that you can see the slasher's internal machinations, and +provide logs to the devs should you encounter any bugs. + +## Configuration + +The slasher has several configuration options that control its functioning. + +### Database Directory + +* Flag: `--slasher-dir PATH` +* Argument: path to directory + +By default the slasher stores data in the `slasher_db` directory inside the beacon node's datadir, +e.g. `~/.lighthouse/{testnet}/beacon/slasher_db`. You can use this flag to change that storage +directory. + +### History Length + +* Flag: `--slasher-history-length EPOCHS` +* Argument: number of epochs +* Default: 4096 epochs + +The slasher stores data for the `history-length` most recent epochs. By default the history length +is set high in order to catch all validator misbehaviour since the last weak subjectivity +checkpoint. If you would like to reduce the resource requirements (particularly disk space), set the +history length to a lower value, although a lower history length may prevent your slasher from +finding some slashings. + +**Note:** See the `--slasher-max-db-size` section below to ensure that your disk space savings are +applied. The history length must be a multiple of the chunk size (default 16), and cannot be +changed after initialization. + +### Max Database Size + +* Flag: `--slasher-max-db-size GIGABYTES` +* Argument: maximum size of the database in gigabytes +* Default: 256 GB + +The slasher uses LMDB as its backing store, and LMDB will consume up to the maximum amount of disk +space allocated to it. By default the limit is set to accomodate the default history length and +around 150K validators but you can set it lower if running with a reduced history length. The space +required scales approximately linearly in validator count and history length, i.e. if you halve +either you can halve the space required. + +If you want a better estimate you can use this formula: + +``` +352 * V * N + (16 * V * N)/(C * K) + 15000 * N +``` + +where + +* `V` is the validator count +* `N` is the history length +* `C` is the chunk size +* `K` is the validator chunk size + +### Update Period + +* Flag: `--slasher-update-period SECONDS` +* Argument: number of seconds +* Default: 12 seconds + +Set the length of the time interval between each slasher batch update. You can check if your +slasher is keeping up with its update period by looking for a log message like this: + +``` +DEBG Completed slasher update num_blocks: 1, num_attestations: 279, time_taken: 1821ms, epoch: 20889, service: slasher +``` + +If the `time_taken` is substantially longer than the update period then it indicates your machine is +struggling under the load, and you should consider increasing the update period or lowering the +resource requirements by tweaking the history length. + +### Chunk Size and Validator Chunk Size + +* Flags: `--slasher-chunk-size EPOCHS`, `--slasher-validator-chunk-size NUM_VALIDATORS` +* Arguments: number of ecochs, number of validators +* Defaults: 16, 256 + +Adjusting these parameter should only be done in conjunction with reading in detail +about [how the slasher works][design-notes], and/or reading the source code. + +[design-notes]: https://hackmd.io/@sproul/min-max-slasher + +### Short-Range Example + +If you would like to run a lightweight slasher that just checks blocks and attestations within +the last day or so, you can use this combination of arguments: + +``` +lighthouse bn --slasher --slasher-history-length 256 --slasher-max-db-size 16 --debug-level debug +``` + +## Stability Warning + +The slasher code is still quite new, so we may update the schema of the slasher database in a +backwards-incompatible way which will require re-initialization. diff --git a/common/directory/src/lib.rs b/common/directory/src/lib.rs index 765fdabd6..1036e8289 100644 --- a/common/directory/src/lib.rs +++ b/common/directory/src/lib.rs @@ -1,6 +1,6 @@ use clap::ArgMatches; pub use eth2_testnet_config::DEFAULT_HARDCODED_TESTNET; -use std::fs::create_dir_all; +use std::fs::{self, create_dir_all}; use std::path::{Path, PathBuf}; /// Names for the default directories. @@ -58,3 +58,21 @@ pub fn parse_path_or_default_with_flag( .join(flag), ) } + +/// Get the approximate size of a directory and its contents. +/// +/// Will skip unreadable files, and files. Not 100% accurate if files are being created and deleted +/// while this function is running. +pub fn size_of_dir(path: &Path) -> u64 { + if let Ok(iter) = fs::read_dir(path) { + iter.filter_map(std::result::Result::ok) + .map(size_of_dir_entry) + .sum() + } else { + 0 + } +} + +fn size_of_dir_entry(dir: fs::DirEntry) -> u64 { + dir.metadata().map(|m| m.len()).unwrap_or(0) +} diff --git a/common/lighthouse_metrics/src/lib.rs b/common/lighthouse_metrics/src/lib.rs index 1bed7b74b..39d759d1d 100644 --- a/common/lighthouse_metrics/src/lib.rs +++ b/common/lighthouse_metrics/src/lib.rs @@ -143,7 +143,7 @@ pub fn try_create_float_gauge_vec( Ok(counter_vec) } -/// Attempts to create a `IntGaugeVec`, returning `Err` if the registry does not accept the gauge +/// Attempts to create a `IntCounterVec`, returning `Err` if the registry does not accept the gauge /// (potentially due to naming conflict). pub fn try_create_int_counter_vec( name: &str, @@ -221,6 +221,12 @@ pub fn inc_counter_vec(int_counter_vec: &Result, name: &[&str]) { } } +pub fn inc_counter_vec_by(int_counter_vec: &Result, name: &[&str], amount: i64) { + if let Some(counter) = get_int_counter(int_counter_vec, name) { + counter.inc_by(amount); + } +} + /// If `histogram_vec.is_ok()`, returns a histogram with the given `name`. pub fn get_histogram(histogram_vec: &Result, name: &[&str]) -> Option { if let Ok(histogram_vec) = histogram_vec { diff --git a/consensus/types/src/beacon_block_header.rs b/consensus/types/src/beacon_block_header.rs index 708c0e16f..82222b035 100644 --- a/consensus/types/src/beacon_block_header.rs +++ b/consensus/types/src/beacon_block_header.rs @@ -11,7 +11,9 @@ use tree_hash_derive::TreeHash; /// /// Spec v0.12.1 #[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, TestRandom)] +#[derive( + Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, TestRandom, +)] pub struct BeaconBlockHeader { pub slot: Slot, #[serde(with = "serde_utils::quoted_u64")] diff --git a/consensus/types/src/proposer_slashing.rs b/consensus/types/src/proposer_slashing.rs index 0055a04ce..ff12b0611 100644 --- a/consensus/types/src/proposer_slashing.rs +++ b/consensus/types/src/proposer_slashing.rs @@ -10,7 +10,9 @@ use tree_hash_derive::TreeHash; /// /// Spec v0.12.1 #[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, TestRandom)] +#[derive( + Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, TestRandom, +)] pub struct ProposerSlashing { pub signed_header_1: SignedBeaconBlockHeader, pub signed_header_2: SignedBeaconBlockHeader, diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index ac0e67fe0..cd2e85072 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -1,6 +1,6 @@ use crate::{ test_utils::TestRandom, BeaconBlock, ChainSpec, Domain, EthSpec, Fork, Hash256, PublicKey, - SignedRoot, SigningData, Slot, + SignedBeaconBlockHeader, SignedRoot, SigningData, Slot, }; use bls::Signature; use serde_derive::{Deserialize, Serialize}; @@ -82,6 +82,14 @@ impl SignedBeaconBlock { self.signature.verify(pubkey, message) } + /// Produce a signed beacon block header corresponding to this block. + pub fn signed_block_header(&self) -> SignedBeaconBlockHeader { + SignedBeaconBlockHeader { + message: self.message.block_header(), + signature: self.signature.clone(), + } + } + /// Convenience accessor for the block's slot. pub fn slot(&self) -> Slot { self.message.slot diff --git a/consensus/types/src/signed_beacon_block_header.rs b/consensus/types/src/signed_beacon_block_header.rs index 700a694f8..b35765942 100644 --- a/consensus/types/src/signed_beacon_block_header.rs +++ b/consensus/types/src/signed_beacon_block_header.rs @@ -1,21 +1,60 @@ -use crate::{test_utils::TestRandom, BeaconBlockHeader}; -use bls::Signature; - +use crate::{ + test_utils::TestRandom, BeaconBlockHeader, ChainSpec, Domain, EthSpec, Fork, Hash256, + PublicKey, Signature, SignedRoot, +}; +use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; +use ssz::Encode; use ssz_derive::{Decode, Encode}; +use std::hash::{Hash, Hasher}; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; -/// An exit voluntarily submitted a validator who wishes to withdraw. +/// A signed header of a `BeaconBlock`. /// /// Spec v0.12.1 #[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, TestRandom)] +#[derive(Derivative, Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, TestRandom)] +#[derivative(PartialEq, Eq)] pub struct SignedBeaconBlockHeader { pub message: BeaconBlockHeader, pub signature: Signature, } +/// Implementation of non-crypto-secure `Hash`, for use with `HashMap` and `HashSet`. +/// +/// Guarantees `header1 == header2 -> hash(header1) == hash(header2)`. +/// +/// Used in the slasher. +impl Hash for SignedBeaconBlockHeader { + fn hash(&self, state: &mut H) { + self.message.hash(state); + self.signature.as_ssz_bytes().hash(state); + } +} + +impl SignedBeaconBlockHeader { + /// Verify that this block header was signed by `pubkey`. + pub fn verify_signature( + &self, + pubkey: &PublicKey, + fork: &Fork, + genesis_validators_root: Hash256, + spec: &ChainSpec, + ) -> bool { + let domain = spec.get_domain( + self.message.slot.epoch(E::slots_per_epoch()), + Domain::BeaconProposer, + fork, + genesis_validators_root, + ); + + let message = self.message.signing_root(domain); + + self.signature.verify(pubkey, message) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crypto/bls/src/generic_signature.rs b/crypto/bls/src/generic_signature.rs index 3f313ea1c..cce17fd2f 100644 --- a/crypto/bls/src/generic_signature.rs +++ b/crypto/bls/src/generic_signature.rs @@ -44,7 +44,7 @@ pub trait TSignature: Sized + Clone { /// /// Provides generic functionality whilst deferring all serious cryptographic operations to the /// generics. -#[derive(Clone, PartialEq)] +#[derive(Clone, PartialEq, Eq)] pub struct GenericSignature { /// The underlying point which performs *actual* cryptographic operations. point: Option, diff --git a/slasher/Cargo.toml b/slasher/Cargo.toml new file mode 100644 index 000000000..bd0bb1675 --- /dev/null +++ b/slasher/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "slasher" +version = "0.1.0" +authors = ["Michael Sproul "] +edition = "2018" + +[dependencies] +bincode = "1.3.1" +byteorder = "1.3.4" +directory = { path = "../common/directory" } +eth2_ssz = { path = "../consensus/ssz" } +eth2_ssz_derive = { path = "../consensus/ssz_derive" } +flate2 = { version = "1.0.14", features = ["zlib"], default-features = false } +lazy_static = "1.4.0" +lighthouse_metrics = { path = "../common/lighthouse_metrics" } +lmdb = "0.8" +lmdb-sys = "0.8" +parking_lot = "0.11.0" +rand = "0.7" +safe_arith = { path = "../consensus/safe_arith" } +serde = "1.0" +serde_derive = "1.0" +slog = "2.5.2" +sloggers = "*" +slot_clock = { path = "../common/slot_clock" } +task_executor = { path = "../common/task_executor" } +tokio = { version = "0.2.21", features = ["full"] } +tree_hash = { path = "../consensus/tree_hash" } +tree_hash_derive = { path = "../consensus/tree_hash_derive" } +types = { path = "../consensus/types" } + +[dev-dependencies] +maplit = "1.0.2" +rayon = "1.3.0" +tempdir = "0.3.7" + +[features] +test_logger = [] diff --git a/slasher/src/array.rs b/slasher/src/array.rs new file mode 100644 index 000000000..6a19dbdc3 --- /dev/null +++ b/slasher/src/array.rs @@ -0,0 +1,631 @@ +use crate::metrics::{self, SLASHER_COMPRESSION_RATIO, SLASHER_NUM_CHUNKS_UPDATED}; +use crate::{AttesterRecord, AttesterSlashingStatus, Config, Error, SlasherDB}; +use flate2::bufread::{ZlibDecoder, ZlibEncoder}; +use lmdb::{RwTransaction, Transaction}; +use serde_derive::{Deserialize, Serialize}; +use std::collections::{btree_map::Entry, BTreeMap, HashSet}; +use std::convert::TryFrom; +use std::io::Read; +use std::iter::Extend; +use std::sync::Arc; +use types::{AttesterSlashing, Epoch, EthSpec, IndexedAttestation}; + +pub const MAX_DISTANCE: u16 = u16::MAX; + +/// Terminology: +/// +/// Let +/// N = config.history_length +/// C = config.chunk_size +/// K = config.validator_chunk_size +/// +/// Then +/// +/// `chunk_index` in [0..N/C) is the column of a chunk in the 2D matrix +/// `validator_chunk_index` in [0..N/K) is the row of a chunk in the 2D matrix +/// `chunk_offset` in [0..C) is the horizontal (epoch) offset of a value within a 2D chunk +/// `validator_offset` in [0..K) is the vertical (validator) offset of a value within a 2D chunk +#[derive(Debug, Serialize, Deserialize)] +pub struct Chunk { + data: Vec, +} + +impl Chunk { + pub fn get_target( + &self, + validator_index: u64, + epoch: Epoch, + config: &Config, + ) -> Result { + assert_eq!( + self.data.len(), + config.chunk_size * config.validator_chunk_size + ); + let validator_offset = config.validator_offset(validator_index); + let chunk_offset = config.chunk_offset(epoch); + let cell_index = config.cell_index(validator_offset, chunk_offset); + self.data + .get(cell_index) + .map(|distance| epoch + u64::from(*distance)) + .ok_or_else(|| Error::ChunkIndexOutOfBounds(cell_index)) + } + + pub fn set_target( + &mut self, + validator_index: u64, + epoch: Epoch, + target_epoch: Epoch, + config: &Config, + ) -> Result<(), Error> { + let distance = Self::epoch_distance(target_epoch, epoch)?; + self.set_raw_distance(validator_index, epoch, distance, config) + } + + pub fn set_raw_distance( + &mut self, + validator_index: u64, + epoch: Epoch, + target_distance: u16, + config: &Config, + ) -> Result<(), Error> { + let validator_offset = config.validator_offset(validator_index); + let chunk_offset = config.chunk_offset(epoch); + let cell_index = config.cell_index(validator_offset, chunk_offset); + + let cell = self + .data + .get_mut(cell_index) + .ok_or_else(|| Error::ChunkIndexOutOfBounds(cell_index))?; + *cell = target_distance; + Ok(()) + } + + /// Compute the distance (difference) between two epochs. + /// + /// Error if the distance is greater than or equal to `MAX_DISTANCE`. + pub fn epoch_distance(epoch: Epoch, base_epoch: Epoch) -> Result { + let distance_u64 = epoch + .as_u64() + .checked_sub(base_epoch.as_u64()) + .ok_or(Error::DistanceCalculationOverflow)?; + + let distance = u16::try_from(distance_u64).map_err(|_| Error::DistanceTooLarge)?; + if distance < MAX_DISTANCE { + Ok(distance) + } else { + Err(Error::DistanceTooLarge) + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(transparent)] +pub struct MinTargetChunk { + chunk: Chunk, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(transparent)] +pub struct MaxTargetChunk { + chunk: Chunk, +} + +pub trait TargetArrayChunk: Sized + serde::Serialize + serde::de::DeserializeOwned { + fn name() -> &'static str; + + fn empty(config: &Config) -> Self; + + fn chunk(&mut self) -> &mut Chunk; + + fn neutral_element() -> u16; + + fn check_slashable( + &self, + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_index: u64, + attestation: &IndexedAttestation, + config: &Config, + ) -> Result, Error>; + + fn update( + &mut self, + chunk_index: usize, + validator_index: u64, + start_epoch: Epoch, + new_target_epoch: Epoch, + current_epoch: Epoch, + config: &Config, + ) -> Result; + + fn first_start_epoch( + source_epoch: Epoch, + current_epoch: Epoch, + config: &Config, + ) -> Option; + + fn next_start_epoch(start_epoch: Epoch, config: &Config) -> Epoch; + + fn select_db(db: &SlasherDB) -> lmdb::Database; + + fn load( + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_chunk_index: usize, + chunk_index: usize, + config: &Config, + ) -> Result, Error> { + let disk_key = config.disk_key(validator_chunk_index, chunk_index); + let chunk_bytes = match txn.get(Self::select_db(db), &disk_key.to_be_bytes()) { + Ok(chunk_bytes) => chunk_bytes, + Err(lmdb::Error::NotFound) => return Ok(None), + Err(e) => return Err(e.into()), + }; + + let chunk = bincode::deserialize_from(ZlibDecoder::new(chunk_bytes))?; + + Ok(Some(chunk)) + } + + fn store( + &self, + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_chunk_index: usize, + chunk_index: usize, + config: &Config, + ) -> Result<(), Error> { + let disk_key = config.disk_key(validator_chunk_index, chunk_index); + let value = bincode::serialize(self)?; + let mut encoder = ZlibEncoder::new(&value[..], flate2::Compression::default()); + let mut compressed_value = vec![]; + encoder.read_to_end(&mut compressed_value)?; + + let compression_ratio = value.len() as f64 / compressed_value.len() as f64; + metrics::set_float_gauge(&SLASHER_COMPRESSION_RATIO, compression_ratio); + + txn.put( + Self::select_db(db), + &disk_key.to_be_bytes(), + &compressed_value, + SlasherDB::::write_flags(), + )?; + Ok(()) + } +} + +impl TargetArrayChunk for MinTargetChunk { + fn name() -> &'static str { + "min" + } + + fn empty(config: &Config) -> Self { + MinTargetChunk { + chunk: Chunk { + data: vec![ + Self::neutral_element(); + config.chunk_size * config.validator_chunk_size + ], + }, + } + } + + fn neutral_element() -> u16 { + MAX_DISTANCE + } + + fn chunk(&mut self) -> &mut Chunk { + &mut self.chunk + } + + fn check_slashable( + &self, + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_index: u64, + attestation: &IndexedAttestation, + config: &Config, + ) -> Result, Error> { + let min_target = + self.chunk + .get_target(validator_index, attestation.data.source.epoch, config)?; + if attestation.data.target.epoch > min_target { + let existing_attestation = + db.get_attestation_for_validator(txn, validator_index, min_target)?; + + if attestation.data.source.epoch < existing_attestation.data.source.epoch { + Ok(AttesterSlashingStatus::SurroundsExisting(Box::new( + existing_attestation, + ))) + } else { + Ok(AttesterSlashingStatus::AlreadyDoubleVoted) + } + } else { + Ok(AttesterSlashingStatus::NotSlashable) + } + } + + fn update( + &mut self, + chunk_index: usize, + validator_index: u64, + start_epoch: Epoch, + new_target_epoch: Epoch, + current_epoch: Epoch, + config: &Config, + ) -> Result { + let min_epoch = Epoch::from( + current_epoch + .as_usize() + .saturating_sub(config.history_length - 1), + ); + let mut epoch = start_epoch; + while config.chunk_index(epoch) == chunk_index && epoch >= min_epoch { + if new_target_epoch < self.chunk.get_target(validator_index, epoch, config)? { + self.chunk + .set_target(validator_index, epoch, new_target_epoch, config)?; + } else { + // We can stop. + return Ok(false); + } + epoch -= 1; + } + Ok(epoch >= min_epoch) + } + + fn first_start_epoch( + source_epoch: Epoch, + current_epoch: Epoch, + config: &Config, + ) -> Option { + if source_epoch > current_epoch - config.history_length as u64 { + assert_ne!(source_epoch, 0); + Some(source_epoch - 1) + } else { + None + } + } + + // Move to last epoch of previous chunk + fn next_start_epoch(start_epoch: Epoch, config: &Config) -> Epoch { + let chunk_size = config.chunk_size as u64; + start_epoch / chunk_size * chunk_size - 1 + } + + fn select_db(db: &SlasherDB) -> lmdb::Database { + db.min_targets_db + } +} + +impl TargetArrayChunk for MaxTargetChunk { + fn name() -> &'static str { + "max" + } + + fn empty(config: &Config) -> Self { + MaxTargetChunk { + chunk: Chunk { + data: vec![ + Self::neutral_element(); + config.chunk_size * config.validator_chunk_size + ], + }, + } + } + + fn neutral_element() -> u16 { + 0 + } + + fn chunk(&mut self) -> &mut Chunk { + &mut self.chunk + } + + fn check_slashable( + &self, + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_index: u64, + attestation: &IndexedAttestation, + config: &Config, + ) -> Result, Error> { + let max_target = + self.chunk + .get_target(validator_index, attestation.data.source.epoch, config)?; + if attestation.data.target.epoch < max_target { + let existing_attestation = + db.get_attestation_for_validator(txn, validator_index, max_target)?; + + if existing_attestation.data.source.epoch < attestation.data.source.epoch { + Ok(AttesterSlashingStatus::SurroundedByExisting(Box::new( + existing_attestation, + ))) + } else { + Ok(AttesterSlashingStatus::AlreadyDoubleVoted) + } + } else { + Ok(AttesterSlashingStatus::NotSlashable) + } + } + + fn update( + &mut self, + chunk_index: usize, + validator_index: u64, + start_epoch: Epoch, + new_target_epoch: Epoch, + current_epoch: Epoch, + config: &Config, + ) -> Result { + let mut epoch = start_epoch; + while config.chunk_index(epoch) == chunk_index && epoch <= current_epoch { + if new_target_epoch > self.chunk.get_target(validator_index, epoch, config)? { + self.chunk + .set_target(validator_index, epoch, new_target_epoch, config)?; + } else { + // We can stop. + return Ok(false); + } + epoch += 1; + } + // If the epoch to update now lies beyond the current chunk and is less than + // or equal to the current epoch, then continue to the next chunk to update it. + Ok(epoch <= current_epoch) + } + + fn first_start_epoch( + source_epoch: Epoch, + current_epoch: Epoch, + _config: &Config, + ) -> Option { + if source_epoch < current_epoch { + Some(source_epoch + 1) + } else { + None + } + } + + // Move to first epoch of next chunk + fn next_start_epoch(start_epoch: Epoch, config: &Config) -> Epoch { + let chunk_size = config.chunk_size as u64; + (start_epoch / chunk_size + 1) * chunk_size + } + + fn select_db(db: &SlasherDB) -> lmdb::Database { + db.max_targets_db + } +} + +pub fn get_chunk_for_update<'a, E: EthSpec, T: TargetArrayChunk>( + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + updated_chunks: &'a mut BTreeMap, + validator_chunk_index: usize, + chunk_index: usize, + config: &Config, +) -> Result<&'a mut T, Error> { + Ok(match updated_chunks.entry(chunk_index) { + Entry::Occupied(occupied) => occupied.into_mut(), + Entry::Vacant(vacant) => { + let chunk = if let Some(disk_chunk) = + T::load(db, txn, validator_chunk_index, chunk_index, config)? + { + disk_chunk + } else { + T::empty(config) + }; + vacant.insert(chunk) + } + }) +} + +#[allow(clippy::too_many_arguments)] +pub fn apply_attestation_for_validator( + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + updated_chunks: &mut BTreeMap, + validator_chunk_index: usize, + validator_index: u64, + attestation: &IndexedAttestation, + current_epoch: Epoch, + config: &Config, +) -> Result, Error> { + let mut chunk_index = config.chunk_index(attestation.data.source.epoch); + let mut current_chunk = get_chunk_for_update( + db, + txn, + updated_chunks, + validator_chunk_index, + chunk_index, + config, + )?; + + let slashing_status = + current_chunk.check_slashable(db, txn, validator_index, attestation, config)?; + + if slashing_status != AttesterSlashingStatus::NotSlashable { + return Ok(slashing_status); + } + + let mut start_epoch = if let Some(start_epoch) = + T::first_start_epoch(attestation.data.source.epoch, current_epoch, config) + { + start_epoch + } else { + return Ok(slashing_status); + }; + + loop { + chunk_index = config.chunk_index(start_epoch); + current_chunk = get_chunk_for_update( + db, + txn, + updated_chunks, + validator_chunk_index, + chunk_index, + config, + )?; + let keep_going = current_chunk.update( + chunk_index, + validator_index, + start_epoch, + attestation.data.target.epoch, + current_epoch, + config, + )?; + if !keep_going { + break; + } + start_epoch = T::next_start_epoch(start_epoch, config); + } + + Ok(AttesterSlashingStatus::NotSlashable) +} + +pub fn update( + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_chunk_index: usize, + batch: Vec, AttesterRecord)>>, + current_epoch: Epoch, + config: &Config, +) -> Result>, Error> { + // Split the batch up into horizontal segments. + // Map chunk indexes in the range `0..self.config.chunk_size` to attestations + // for those chunks. + let mut chunk_attestations = BTreeMap::new(); + for attestation in batch { + chunk_attestations + .entry(config.chunk_index(attestation.0.data.source.epoch)) + .or_insert_with(Vec::new) + .push(attestation); + } + + let mut slashings = update_array::<_, MinTargetChunk>( + db, + txn, + validator_chunk_index, + &chunk_attestations, + current_epoch, + config, + )?; + slashings.extend(update_array::<_, MaxTargetChunk>( + db, + txn, + validator_chunk_index, + &chunk_attestations, + current_epoch, + config, + )?); + + // Update all current epochs. + for validator_index in config.validator_indices_in_chunk(validator_chunk_index) { + db.update_current_epoch_for_validator(validator_index, current_epoch, txn)?; + } + + Ok(slashings) +} + +pub fn epoch_update_for_validator( + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + updated_chunks: &mut BTreeMap, + validator_chunk_index: usize, + validator_index: u64, + current_epoch: Epoch, + config: &Config, +) -> Result<(), Error> { + let previous_current_epoch = + if let Some(epoch) = db.get_current_epoch_for_validator(validator_index, txn)? { + epoch + } else { + return Ok(()); + }; + + let mut epoch = previous_current_epoch; + + while epoch <= current_epoch { + let chunk_index = config.chunk_index(epoch); + let current_chunk = get_chunk_for_update( + db, + txn, + updated_chunks, + validator_chunk_index, + chunk_index, + config, + )?; + while config.chunk_index(epoch) == chunk_index && epoch <= current_epoch { + current_chunk.chunk().set_raw_distance( + validator_index, + epoch, + T::neutral_element(), + config, + )?; + epoch += 1; + } + } + + Ok(()) +} + +#[allow(clippy::type_complexity)] +pub fn update_array( + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_chunk_index: usize, + chunk_attestations: &BTreeMap, AttesterRecord)>>>, + current_epoch: Epoch, + config: &Config, +) -> Result>, Error> { + let mut slashings = HashSet::new(); + // Map from chunk index to updated chunk at that index. + let mut updated_chunks = BTreeMap::new(); + + // Update the arrays for the change of current epoch. + for validator_index in config.validator_indices_in_chunk(validator_chunk_index) { + epoch_update_for_validator( + db, + txn, + &mut updated_chunks, + validator_chunk_index, + validator_index, + current_epoch, + config, + )?; + } + + for attestations in chunk_attestations.values() { + for attestation in attestations { + for validator_index in + config.attesting_validators_in_chunk(&attestation.0, validator_chunk_index) + { + let slashing_status = apply_attestation_for_validator::( + db, + txn, + &mut updated_chunks, + validator_chunk_index, + validator_index, + &attestation.0, + current_epoch, + config, + )?; + if let Some(slashing) = slashing_status.into_slashing(&attestation.0) { + slashings.insert(slashing); + } + } + } + } + + // Store chunks on disk. + metrics::inc_counter_vec_by( + &SLASHER_NUM_CHUNKS_UPDATED, + &[T::name()], + updated_chunks.len() as i64, + ); + + for (chunk_index, chunk) in updated_chunks { + chunk.store(db, txn, validator_chunk_index, chunk_index, config)?; + } + + Ok(slashings) +} diff --git a/slasher/src/attestation_queue.rs b/slasher/src/attestation_queue.rs new file mode 100644 index 000000000..70ea2ea13 --- /dev/null +++ b/slasher/src/attestation_queue.rs @@ -0,0 +1,93 @@ +use crate::{AttesterRecord, Config}; +use parking_lot::Mutex; +use std::collections::BTreeSet; +use std::sync::Arc; +use types::{EthSpec, IndexedAttestation}; + +/// Staging area for attestations received from the network. +/// +/// To be added to the database in batches, for efficiency and to prevent data races. +#[derive(Debug, Default)] +pub struct AttestationQueue { + /// All attestations (unique) for storage on disk. + pub queue: Mutex>, +} + +/// Attestations grouped by validator index range. +#[derive(Debug)] +pub struct GroupedAttestations { + pub subqueues: Vec>, +} + +/// A queue of attestations for a range of validator indices. +#[derive(Debug, Default)] +pub struct AttestationBatch { + pub attestations: Vec, AttesterRecord)>>, +} + +impl AttestationBatch { + pub fn len(&self) -> usize { + self.attestations.len() + } + + pub fn is_empty(&self) -> bool { + self.attestations.is_empty() + } + + /// Group the attestations by validator index. + pub fn group_by_validator_index(self, config: &Config) -> GroupedAttestations { + let mut grouped_attestations = GroupedAttestations { subqueues: vec![] }; + + for attestation in self.attestations { + let subqueue_ids = attestation + .0 + .attesting_indices + .iter() + .map(|validator_index| config.validator_chunk_index(*validator_index)) + .collect::>(); + + if let Some(max_subqueue_id) = subqueue_ids.iter().next_back() { + if *max_subqueue_id >= grouped_attestations.subqueues.len() { + grouped_attestations + .subqueues + .resize_with(max_subqueue_id + 1, AttestationBatch::default); + } + } + + for subqueue_id in subqueue_ids { + grouped_attestations.subqueues[subqueue_id] + .attestations + .push(attestation.clone()); + } + } + + grouped_attestations + } +} + +impl AttestationQueue { + /// Add an attestation to the queue. + pub fn queue(&self, attestation: IndexedAttestation) { + let attester_record = AttesterRecord::from(attestation.clone()); + self.queue + .lock() + .attestations + .push(Arc::new((attestation, attester_record))); + } + + pub fn dequeue(&self) -> AttestationBatch { + std::mem::take(&mut self.queue.lock()) + } + + pub fn requeue(&self, batch: AttestationBatch) { + self.queue.lock().attestations.extend(batch.attestations); + } + + pub fn len(&self) -> usize { + self.queue.lock().len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} diff --git a/slasher/src/attester_record.rs b/slasher/src/attester_record.rs new file mode 100644 index 000000000..82d5dc46f --- /dev/null +++ b/slasher/src/attester_record.rs @@ -0,0 +1,57 @@ +use ssz_derive::{Decode, Encode}; +use tree_hash::TreeHash as _; +use tree_hash_derive::TreeHash; +use types::{AggregateSignature, EthSpec, Hash256, IndexedAttestation, VariableList}; + +#[derive(Debug, Clone, Copy, Encode, Decode)] +pub struct AttesterRecord { + /// The hash of the attestation data, for checking double-voting. + pub attestation_data_hash: Hash256, + /// The hash of the indexed attestation, so it can be loaded. + pub indexed_attestation_hash: Hash256, +} + +#[derive(Debug, Clone, Encode, Decode, TreeHash)] +struct IndexedAttestationHeader { + pub attesting_indices: VariableList, + pub data_root: Hash256, + pub signature: AggregateSignature, +} + +impl From> for AttesterRecord { + fn from(indexed_attestation: IndexedAttestation) -> AttesterRecord { + let attestation_data_hash = indexed_attestation.data.tree_hash_root(); + let header = IndexedAttestationHeader:: { + attesting_indices: indexed_attestation.attesting_indices, + data_root: attestation_data_hash, + signature: indexed_attestation.signature, + }; + let indexed_attestation_hash = header.tree_hash_root(); + AttesterRecord { + attestation_data_hash, + indexed_attestation_hash, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::test_utils::indexed_att; + + // Check correctness of fast hashing + #[test] + fn fast_hash() { + let data = vec![ + indexed_att(vec![], 0, 0, 0), + indexed_att(vec![1, 2, 3], 12, 14, 1), + indexed_att(vec![4], 0, 5, u64::MAX), + ]; + for att in data { + assert_eq!( + att.tree_hash_root(), + AttesterRecord::from(att).indexed_attestation_hash + ); + } + } +} diff --git a/slasher/src/block_queue.rs b/slasher/src/block_queue.rs new file mode 100644 index 000000000..10086ce37 --- /dev/null +++ b/slasher/src/block_queue.rs @@ -0,0 +1,26 @@ +use parking_lot::Mutex; +use types::SignedBeaconBlockHeader; + +#[derive(Debug, Default)] +pub struct BlockQueue { + blocks: Mutex>, +} + +impl BlockQueue { + pub fn queue(&self, block_header: SignedBeaconBlockHeader) { + self.blocks.lock().push(block_header) + } + + pub fn dequeue(&self) -> Vec { + let mut blocks = self.blocks.lock(); + std::mem::replace(&mut *blocks, vec![]) + } + + pub fn len(&self) -> usize { + self.blocks.lock().len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} diff --git a/slasher/src/config.rs b/slasher/src/config.rs new file mode 100644 index 000000000..dba2e604e --- /dev/null +++ b/slasher/src/config.rs @@ -0,0 +1,111 @@ +use crate::Error; +use serde_derive::{Deserialize, Serialize}; +use std::path::PathBuf; +use types::{Epoch, EthSpec, IndexedAttestation}; + +pub const DEFAULT_CHUNK_SIZE: usize = 16; +pub const DEFAULT_VALIDATOR_CHUNK_SIZE: usize = 256; +pub const DEFAULT_HISTORY_LENGTH: usize = 4096; +pub const DEFAULT_UPDATE_PERIOD: u64 = 12; +pub const DEFAULT_MAX_DB_SIZE: usize = 256; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Config { + pub database_path: PathBuf, + pub chunk_size: usize, + pub validator_chunk_size: usize, + /// Number of epochs of history to keep. + pub history_length: usize, + /// Update frequency in seconds. + pub update_period: u64, + /// Maximum size of the LMDB database in gigabytes. + pub max_db_size_gbs: usize, +} + +impl Config { + pub fn new(database_path: PathBuf) -> Self { + Self { + database_path, + chunk_size: DEFAULT_CHUNK_SIZE, + validator_chunk_size: DEFAULT_VALIDATOR_CHUNK_SIZE, + history_length: DEFAULT_HISTORY_LENGTH, + update_period: DEFAULT_UPDATE_PERIOD, + max_db_size_gbs: DEFAULT_MAX_DB_SIZE, + } + } + + pub fn validate(&self) -> Result<(), Error> { + if self.chunk_size == 0 + || self.validator_chunk_size == 0 + || self.history_length == 0 + || self.max_db_size_gbs == 0 + { + Err(Error::ConfigInvalidZeroParameter { + config: self.clone(), + }) + } else if self.history_length % self.chunk_size != 0 { + Err(Error::ConfigInvalidChunkSize { + chunk_size: self.chunk_size, + history_length: self.history_length, + }) + } else { + Ok(()) + } + } + + pub fn is_compatible(&self, other: &Config) -> bool { + self.chunk_size == other.chunk_size + && self.validator_chunk_size == other.validator_chunk_size + && self.history_length == other.history_length + } + + pub fn chunk_index(&self, epoch: Epoch) -> usize { + (epoch.as_usize() % self.history_length) / self.chunk_size + } + + pub fn validator_chunk_index(&self, validator_index: u64) -> usize { + validator_index as usize / self.validator_chunk_size + } + + pub fn chunk_offset(&self, epoch: Epoch) -> usize { + epoch.as_usize() % self.chunk_size + } + + pub fn validator_offset(&self, validator_index: u64) -> usize { + validator_index as usize % self.validator_chunk_size + } + + /// Map the validator and epoch chunk indexes into a single value for use as a database key. + pub fn disk_key(&self, validator_chunk_index: usize, chunk_index: usize) -> usize { + let width = self.history_length / self.chunk_size; + validator_chunk_index * width + chunk_index + } + + /// Map the validator and epoch offsets into an index for `Chunk::data`. + pub fn cell_index(&self, validator_offset: usize, chunk_offset: usize) -> usize { + validator_offset * self.chunk_size + chunk_offset + } + + /// Return an iterator over all the validator indices in a validator chunk. + pub fn validator_indices_in_chunk( + &self, + validator_chunk_index: usize, + ) -> impl Iterator { + (validator_chunk_index * self.validator_chunk_size + ..(validator_chunk_index + 1) * self.validator_chunk_size) + .map(|index| index as u64) + } + + /// Iterate over the attesting indices which belong to the `validator_chunk_index` chunk. + pub fn attesting_validators_in_chunk<'a, E: EthSpec>( + &'a self, + attestation: &'a IndexedAttestation, + validator_chunk_index: usize, + ) -> impl Iterator + 'a { + attestation + .attesting_indices + .iter() + .filter(move |v| self.validator_chunk_index(**v) == validator_chunk_index) + .copied() + } +} diff --git a/slasher/src/database.rs b/slasher/src/database.rs new file mode 100644 index 000000000..8899d0547 --- /dev/null +++ b/slasher/src/database.rs @@ -0,0 +1,535 @@ +use crate::{ + utils::TxnOptional, AttesterRecord, AttesterSlashingStatus, Config, Error, + ProposerSlashingStatus, +}; +use byteorder::{BigEndian, ByteOrder}; +use lmdb::{Cursor, Database, DatabaseFlags, Environment, RwTransaction, Transaction, WriteFlags}; +use ssz::{Decode, Encode}; +use std::collections::HashSet; +use std::marker::PhantomData; +use std::sync::Arc; +use types::{ + Epoch, EthSpec, Hash256, IndexedAttestation, ProposerSlashing, SignedBeaconBlockHeader, Slot, +}; + +/// Current database schema version, to check compatibility of on-disk DB with software. +const CURRENT_SCHEMA_VERSION: u64 = 0; + +/// Metadata about the slashing database itself. +const METADATA_DB: &str = "metadata"; +/// Map from `(target_epoch, validator_index)` to `AttesterRecord`. +const ATTESTERS_DB: &str = "attesters"; +/// Map from `indexed_attestation_hash` to `IndexedAttestation`. +const INDEXED_ATTESTATION_DB: &str = "indexed_attestations"; +/// Table of minimum targets for every source epoch within range. +const MIN_TARGETS_DB: &str = "min_targets"; +/// Table of maximum targets for every source epoch within range. +const MAX_TARGETS_DB: &str = "max_targets"; +/// Map from `validator_index` to the `current_epoch` for that validator. +/// +/// Used to implement wrap-around semantics for the min and max target arrays. +const CURRENT_EPOCHS_DB: &str = "current_epochs"; +/// Map from `(slot, validator_index)` to `SignedBeaconBlockHeader`. +const PROPOSERS_DB: &str = "proposers"; + +/// The number of DBs for LMDB to use (equal to the number of DBs defined above). +const LMDB_MAX_DBS: u32 = 7; + +/// Constant key under which the schema version is stored in the `metadata_db`. +const METADATA_VERSION_KEY: &[u8] = &[0]; +/// Constant key under which the slasher configuration is stored in the `metadata_db`. +const METADATA_CONFIG_KEY: &[u8] = &[1]; + +const ATTESTER_KEY_SIZE: usize = 16; +const PROPOSER_KEY_SIZE: usize = 16; +const GIGABYTE: usize = 1 << 30; + +#[derive(Debug)] +pub struct SlasherDB { + pub(crate) env: Environment, + pub(crate) indexed_attestation_db: Database, + pub(crate) attesters_db: Database, + pub(crate) min_targets_db: Database, + pub(crate) max_targets_db: Database, + pub(crate) current_epochs_db: Database, + pub(crate) proposers_db: Database, + pub(crate) metadata_db: Database, + config: Arc, + _phantom: PhantomData, +} + +/// Database key for the `attesters` database. +/// +/// Stored as big-endian `(target_epoch, validator_index)` to enable efficient iteration +/// while pruning. +#[derive(Debug)] +pub struct AttesterKey { + data: [u8; ATTESTER_KEY_SIZE], +} + +impl AttesterKey { + pub fn new(validator_index: u64, target_epoch: Epoch) -> Self { + let mut data = [0; ATTESTER_KEY_SIZE]; + data[0..8].copy_from_slice(&target_epoch.as_u64().to_be_bytes()); + data[8..ATTESTER_KEY_SIZE].copy_from_slice(&validator_index.to_be_bytes()); + AttesterKey { data } + } + + pub fn parse(data: &[u8]) -> Result<(Epoch, u64), Error> { + if data.len() == ATTESTER_KEY_SIZE { + let target_epoch = Epoch::new(BigEndian::read_u64(&data[..8])); + let validator_index = BigEndian::read_u64(&data[8..]); + Ok((target_epoch, validator_index)) + } else { + Err(Error::AttesterKeyCorrupt { length: data.len() }) + } + } +} + +impl AsRef<[u8]> for AttesterKey { + fn as_ref(&self) -> &[u8] { + &self.data + } +} + +/// Database key for the `proposers` database. +/// +/// Stored as big-endian `(slot, validator_index)` to enable efficient iteration +/// while pruning. +#[derive(Debug)] +pub struct ProposerKey { + data: [u8; PROPOSER_KEY_SIZE], +} + +impl ProposerKey { + pub fn new(validator_index: u64, slot: Slot) -> Self { + let mut data = [0; PROPOSER_KEY_SIZE]; + data[0..8].copy_from_slice(&slot.as_u64().to_be_bytes()); + data[8..PROPOSER_KEY_SIZE].copy_from_slice(&validator_index.to_be_bytes()); + ProposerKey { data } + } + + pub fn parse(data: &[u8]) -> Result<(Slot, u64), Error> { + if data.len() == PROPOSER_KEY_SIZE { + let slot = Slot::new(BigEndian::read_u64(&data[..8])); + let validator_index = BigEndian::read_u64(&data[8..]); + Ok((slot, validator_index)) + } else { + Err(Error::ProposerKeyCorrupt { length: data.len() }) + } + } +} + +impl AsRef<[u8]> for ProposerKey { + fn as_ref(&self) -> &[u8] { + &self.data + } +} + +/// Key containing a validator index +pub struct CurrentEpochKey { + validator_index: [u8; 8], +} + +impl CurrentEpochKey { + pub fn new(validator_index: u64) -> Self { + Self { + validator_index: validator_index.to_be_bytes(), + } + } +} + +impl AsRef<[u8]> for CurrentEpochKey { + fn as_ref(&self) -> &[u8] { + &self.validator_index + } +} + +impl SlasherDB { + pub fn open(config: Arc) -> Result { + std::fs::create_dir_all(&config.database_path)?; + let env = Environment::new() + .set_max_dbs(LMDB_MAX_DBS) + .set_map_size(config.max_db_size_gbs * GIGABYTE) + .open_with_permissions(&config.database_path, 0o600)?; + let indexed_attestation_db = + env.create_db(Some(INDEXED_ATTESTATION_DB), Self::db_flags())?; + let attesters_db = env.create_db(Some(ATTESTERS_DB), Self::db_flags())?; + let min_targets_db = env.create_db(Some(MIN_TARGETS_DB), Self::db_flags())?; + let max_targets_db = env.create_db(Some(MAX_TARGETS_DB), Self::db_flags())?; + let current_epochs_db = env.create_db(Some(CURRENT_EPOCHS_DB), Self::db_flags())?; + let proposers_db = env.create_db(Some(PROPOSERS_DB), Self::db_flags())?; + let metadata_db = env.create_db(Some(METADATA_DB), Self::db_flags())?; + + let db = Self { + env, + indexed_attestation_db, + attesters_db, + min_targets_db, + max_targets_db, + current_epochs_db, + proposers_db, + metadata_db, + config, + _phantom: PhantomData, + }; + + let mut txn = db.begin_rw_txn()?; + + if let Some(schema_version) = db.load_schema_version(&mut txn)? { + if schema_version != CURRENT_SCHEMA_VERSION { + return Err(Error::IncompatibleSchemaVersion { + database_schema_version: schema_version, + software_schema_version: CURRENT_SCHEMA_VERSION, + }); + } + } + db.store_schema_version(&mut txn)?; + + if let Some(on_disk_config) = db.load_config(&mut txn)? { + if !db.config.is_compatible(&on_disk_config) { + return Err(Error::ConfigIncompatible { + on_disk_config, + config: (*db.config).clone(), + }); + } + } + db.store_config(&mut txn)?; + txn.commit()?; + + Ok(db) + } + + pub fn db_flags() -> DatabaseFlags { + DatabaseFlags::default() + } + + pub fn write_flags() -> WriteFlags { + WriteFlags::default() + } + + pub fn begin_rw_txn(&self) -> Result, Error> { + Ok(self.env.begin_rw_txn()?) + } + + pub fn load_schema_version(&self, txn: &mut RwTransaction<'_>) -> Result, Error> { + Ok(txn + .get(self.metadata_db, &METADATA_VERSION_KEY) + .optional()? + .map(bincode::deserialize) + .transpose()?) + } + + pub fn store_schema_version(&self, txn: &mut RwTransaction<'_>) -> Result<(), Error> { + txn.put( + self.metadata_db, + &METADATA_VERSION_KEY, + &bincode::serialize(&CURRENT_SCHEMA_VERSION)?, + Self::write_flags(), + )?; + Ok(()) + } + + pub fn load_config(&self, txn: &mut RwTransaction<'_>) -> Result, Error> { + Ok(txn + .get(self.metadata_db, &METADATA_CONFIG_KEY) + .optional()? + .map(bincode::deserialize) + .transpose()?) + } + + pub fn store_config(&self, txn: &mut RwTransaction<'_>) -> Result<(), Error> { + txn.put( + self.metadata_db, + &METADATA_CONFIG_KEY, + &bincode::serialize(self.config.as_ref())?, + Self::write_flags(), + )?; + Ok(()) + } + + pub fn get_current_epoch_for_validator( + &self, + validator_index: u64, + txn: &mut RwTransaction<'_>, + ) -> Result, Error> { + Ok(txn + .get( + self.current_epochs_db, + &CurrentEpochKey::new(validator_index), + ) + .optional()? + .map(Epoch::from_ssz_bytes) + .transpose()?) + } + + pub fn update_current_epoch_for_validator( + &self, + validator_index: u64, + current_epoch: Epoch, + txn: &mut RwTransaction<'_>, + ) -> Result<(), Error> { + txn.put( + self.current_epochs_db, + &CurrentEpochKey::new(validator_index), + ¤t_epoch.as_ssz_bytes(), + Self::write_flags(), + )?; + Ok(()) + } + + pub fn store_indexed_attestation( + &self, + txn: &mut RwTransaction<'_>, + indexed_attestation_hash: Hash256, + indexed_attestation: &IndexedAttestation, + ) -> Result<(), Error> { + let data = indexed_attestation.as_ssz_bytes(); + + txn.put( + self.indexed_attestation_db, + &indexed_attestation_hash.as_bytes(), + &data, + Self::write_flags(), + )?; + Ok(()) + } + + pub fn get_indexed_attestation( + &self, + txn: &mut RwTransaction<'_>, + indexed_attestation_hash: Hash256, + ) -> Result, Error> { + let bytes = txn + .get(self.indexed_attestation_db, &indexed_attestation_hash) + .optional()? + .ok_or_else(|| Error::MissingIndexedAttestation { + root: indexed_attestation_hash, + })?; + Ok(IndexedAttestation::from_ssz_bytes(bytes)?) + } + + pub fn check_and_update_attester_record( + &self, + txn: &mut RwTransaction<'_>, + validator_index: u64, + attestation: &IndexedAttestation, + record: AttesterRecord, + ) -> Result, Error> { + // See if there's an existing attestation for this attester. + if let Some(existing_record) = + self.get_attester_record(txn, validator_index, attestation.data.target.epoch)? + { + // If the existing attestation data is identical, then this attestation is not + // slashable and no update is required. + if existing_record.attestation_data_hash == record.attestation_data_hash { + return Ok(AttesterSlashingStatus::NotSlashable); + } + + // Otherwise, load the indexed attestation so we can confirm that it's slashable. + let existing_attestation = + self.get_indexed_attestation(txn, existing_record.indexed_attestation_hash)?; + if attestation.is_double_vote(&existing_attestation) { + Ok(AttesterSlashingStatus::DoubleVote(Box::new( + existing_attestation, + ))) + } else { + Err(Error::AttesterRecordInconsistentRoot) + } + } + // If no attestation exists, insert a record for this validator. + else { + txn.put( + self.attesters_db, + &AttesterKey::new(validator_index, attestation.data.target.epoch), + &record.as_ssz_bytes(), + Self::write_flags(), + )?; + Ok(AttesterSlashingStatus::NotSlashable) + } + } + + pub fn get_attestation_for_validator( + &self, + txn: &mut RwTransaction<'_>, + validator_index: u64, + target_epoch: Epoch, + ) -> Result, Error> { + let record = self + .get_attester_record(txn, validator_index, target_epoch)? + .ok_or_else(|| Error::MissingAttesterRecord { + validator_index, + target_epoch, + })?; + self.get_indexed_attestation(txn, record.indexed_attestation_hash) + } + + pub fn get_attester_record( + &self, + txn: &mut RwTransaction<'_>, + validator_index: u64, + target: Epoch, + ) -> Result, Error> { + let attester_key = AttesterKey::new(validator_index, target); + Ok(txn + .get(self.attesters_db, &attester_key) + .optional()? + .map(AttesterRecord::from_ssz_bytes) + .transpose()?) + } + + pub fn get_block_proposal( + &self, + txn: &mut RwTransaction<'_>, + proposer_index: u64, + slot: Slot, + ) -> Result, Error> { + let proposer_key = ProposerKey::new(proposer_index, slot); + Ok(txn + .get(self.proposers_db, &proposer_key) + .optional()? + .map(SignedBeaconBlockHeader::from_ssz_bytes) + .transpose()?) + } + + pub fn check_or_insert_block_proposal( + &self, + txn: &mut RwTransaction<'_>, + block_header: SignedBeaconBlockHeader, + ) -> Result { + let proposer_index = block_header.message.proposer_index; + let slot = block_header.message.slot; + + if let Some(existing_block) = self.get_block_proposal(txn, proposer_index, slot)? { + if existing_block == block_header { + Ok(ProposerSlashingStatus::NotSlashable) + } else { + Ok(ProposerSlashingStatus::DoubleVote(Box::new( + ProposerSlashing { + signed_header_1: existing_block, + signed_header_2: block_header, + }, + ))) + } + } else { + txn.put( + self.proposers_db, + &ProposerKey::new(proposer_index, slot), + &block_header.as_ssz_bytes(), + Self::write_flags(), + )?; + Ok(ProposerSlashingStatus::NotSlashable) + } + } + + pub fn prune(&self, current_epoch: Epoch) -> Result<(), Error> { + let mut txn = self.begin_rw_txn()?; + self.prune_proposers(current_epoch, &mut txn)?; + self.prune_attesters(current_epoch, &mut txn)?; + txn.commit()?; + Ok(()) + } + + fn prune_proposers( + &self, + current_epoch: Epoch, + txn: &mut RwTransaction<'_>, + ) -> Result<(), Error> { + let min_slot = current_epoch + .saturating_add(1u64) + .saturating_sub(self.config.history_length) + .start_slot(E::slots_per_epoch()); + + let mut cursor = txn.open_rw_cursor(self.proposers_db)?; + + // Position cursor at first key, bailing out if the database is empty. + if cursor + .get(None, None, lmdb_sys::MDB_FIRST) + .optional()? + .is_none() + { + return Ok(()); + } + + loop { + let key_bytes = cursor + .get(None, None, lmdb_sys::MDB_GET_CURRENT)? + .0 + .ok_or_else(|| Error::MissingProposerKey)?; + + let (slot, _) = ProposerKey::parse(key_bytes)?; + if slot < min_slot { + cursor.del(Self::write_flags())?; + + // End the loop if there is no next entry. + if cursor + .get(None, None, lmdb_sys::MDB_NEXT) + .optional()? + .is_none() + { + break; + } + } else { + break; + } + } + + Ok(()) + } + + fn prune_attesters( + &self, + current_epoch: Epoch, + txn: &mut RwTransaction<'_>, + ) -> Result<(), Error> { + let min_epoch = current_epoch + .saturating_add(1u64) + .saturating_sub(self.config.history_length as u64); + + let mut cursor = txn.open_rw_cursor(self.attesters_db)?; + + // Position cursor at first key, bailing out if the database is empty. + if cursor + .get(None, None, lmdb_sys::MDB_FIRST) + .optional()? + .is_none() + { + return Ok(()); + } + + let mut indexed_attestations_to_delete = HashSet::new(); + + loop { + let (optional_key, value) = cursor.get(None, None, lmdb_sys::MDB_GET_CURRENT)?; + let key_bytes = optional_key.ok_or_else(|| Error::MissingAttesterKey)?; + + let (target_epoch, _validator_index) = AttesterKey::parse(key_bytes)?; + + if target_epoch < min_epoch { + // Stage the indexed attestation for deletion and delete the record itself. + let attester_record = AttesterRecord::from_ssz_bytes(value)?; + indexed_attestations_to_delete.insert(attester_record.indexed_attestation_hash); + + cursor.del(Self::write_flags())?; + + // End the loop if there is no next entry. + if cursor + .get(None, None, lmdb_sys::MDB_NEXT) + .optional()? + .is_none() + { + break; + } + } else { + break; + } + } + drop(cursor); + + for indexed_attestation_hash in indexed_attestations_to_delete { + txn.del(self.indexed_attestation_db, &indexed_attestation_hash, None)?; + } + + Ok(()) + } +} diff --git a/slasher/src/error.rs b/slasher/src/error.rs new file mode 100644 index 000000000..f1c8f727e --- /dev/null +++ b/slasher/src/error.rs @@ -0,0 +1,83 @@ +use crate::Config; +use std::io; +use types::{Epoch, Hash256}; + +#[derive(Debug)] +pub enum Error { + DatabaseError(lmdb::Error), + DatabaseIOError(io::Error), + SszDecodeError(ssz::DecodeError), + BincodeError(bincode::Error), + ArithError(safe_arith::ArithError), + ChunkIndexOutOfBounds(usize), + IncompatibleSchemaVersion { + database_schema_version: u64, + software_schema_version: u64, + }, + ConfigInvalidChunkSize { + chunk_size: usize, + history_length: usize, + }, + ConfigInvalidZeroParameter { + config: Config, + }, + ConfigIncompatible { + on_disk_config: Config, + config: Config, + }, + DistanceTooLarge, + DistanceCalculationOverflow, + /// Missing an attester record that we expected to exist. + MissingAttesterRecord { + validator_index: u64, + target_epoch: Epoch, + }, + AttesterRecordCorrupt { + length: usize, + }, + AttesterKeyCorrupt { + length: usize, + }, + ProposerKeyCorrupt { + length: usize, + }, + MissingIndexedAttestation { + root: Hash256, + }, + MissingAttesterKey, + MissingProposerKey, + AttesterRecordInconsistentRoot, +} + +impl From for Error { + fn from(e: lmdb::Error) -> Self { + match e { + lmdb::Error::Other(os_error) => Error::from(io::Error::from_raw_os_error(os_error)), + _ => Error::DatabaseError(e), + } + } +} + +impl From for Error { + fn from(e: io::Error) -> Self { + Error::DatabaseIOError(e) + } +} + +impl From for Error { + fn from(e: ssz::DecodeError) -> Self { + Error::SszDecodeError(e) + } +} + +impl From for Error { + fn from(e: bincode::Error) -> Self { + Error::BincodeError(e) + } +} + +impl From for Error { + fn from(e: safe_arith::ArithError) -> Self { + Error::ArithError(e) + } +} diff --git a/slasher/src/lib.rs b/slasher/src/lib.rs new file mode 100644 index 000000000..d173f26a0 --- /dev/null +++ b/slasher/src/lib.rs @@ -0,0 +1,66 @@ +#![deny(missing_debug_implementations)] + +mod array; +mod attestation_queue; +mod attester_record; +mod block_queue; +pub mod config; +mod database; +mod error; +mod metrics; +mod slasher; +mod slasher_server; +pub mod test_utils; +mod utils; + +pub use crate::slasher::Slasher; +pub use attestation_queue::{AttestationBatch, AttestationQueue}; +pub use attester_record::AttesterRecord; +pub use block_queue::BlockQueue; +pub use config::Config; +pub use database::SlasherDB; +pub use error::Error; +pub use slasher_server::SlasherServer; + +use types::{AttesterSlashing, EthSpec, IndexedAttestation, ProposerSlashing}; + +#[derive(Debug, PartialEq)] +pub enum AttesterSlashingStatus { + NotSlashable, + /// A weird outcome that can occur when we go to lookup an attestation by its target + /// epoch for a surround slashing, but find a different attestation -- indicating that + /// the validator has already been caught double voting. + AlreadyDoubleVoted, + DoubleVote(Box>), + SurroundsExisting(Box>), + SurroundedByExisting(Box>), +} + +#[derive(Debug, PartialEq)] +pub enum ProposerSlashingStatus { + NotSlashable, + DoubleVote(Box), +} + +impl AttesterSlashingStatus { + pub fn into_slashing( + self, + new_attestation: &IndexedAttestation, + ) -> Option> { + use AttesterSlashingStatus::*; + + // The surrounding attestation must be in `attestation_1` to be valid. + match self { + NotSlashable => None, + AlreadyDoubleVoted => None, + DoubleVote(existing) | SurroundedByExisting(existing) => Some(AttesterSlashing { + attestation_1: *existing, + attestation_2: new_attestation.clone(), + }), + SurroundsExisting(existing) => Some(AttesterSlashing { + attestation_1: new_attestation.clone(), + attestation_2: *existing, + }), + } + } +} diff --git a/slasher/src/metrics.rs b/slasher/src/metrics.rs new file mode 100644 index 000000000..7f95ad8cc --- /dev/null +++ b/slasher/src/metrics.rs @@ -0,0 +1,38 @@ +use lazy_static::lazy_static; +pub use lighthouse_metrics::*; + +lazy_static! { + pub static ref SLASHER_DATABASE_SIZE: Result = try_create_int_gauge( + "slasher_database_size", + "Size of the LMDB database backing the slasher, in bytes" + ); + pub static ref SLASHER_RUN_TIME: Result = try_create_histogram( + "slasher_process_batch_time", + "Time taken to process a batch of blocks and attestations" + ); + pub static ref SLASHER_NUM_ATTESTATIONS_DROPPED: Result = try_create_int_gauge( + "slasher_num_attestations_dropped", + "Number of attestations dropped per batch" + ); + pub static ref SLASHER_NUM_ATTESTATIONS_DEFERRED: Result = try_create_int_gauge( + "slasher_num_attestations_deferred", + "Number of attestations deferred per batch" + ); + pub static ref SLASHER_NUM_ATTESTATIONS_VALID: Result = try_create_int_gauge( + "slasher_num_attestations_valid", + "Number of valid attestations per batch" + ); + pub static ref SLASHER_NUM_BLOCKS_PROCESSED: Result = try_create_int_gauge( + "slasher_num_blocks_processed", + "Number of blocks processed per batch", + ); + pub static ref SLASHER_NUM_CHUNKS_UPDATED: Result = try_create_int_counter_vec( + "slasher_num_chunks_updated", + "Number of min or max target chunks updated on disk", + &["array"], + ); + pub static ref SLASHER_COMPRESSION_RATIO: Result = try_create_float_gauge( + "slasher_compression_ratio", + "Compression ratio for min-max array chunks (higher is better)" + ); +} diff --git a/slasher/src/slasher.rs b/slasher/src/slasher.rs new file mode 100644 index 000000000..1589dc2bf --- /dev/null +++ b/slasher/src/slasher.rs @@ -0,0 +1,299 @@ +use crate::metrics::{ + self, SLASHER_NUM_ATTESTATIONS_DEFERRED, SLASHER_NUM_ATTESTATIONS_DROPPED, + SLASHER_NUM_ATTESTATIONS_VALID, SLASHER_NUM_BLOCKS_PROCESSED, +}; +use crate::{ + array, AttestationBatch, AttestationQueue, AttesterRecord, BlockQueue, Config, Error, + ProposerSlashingStatus, SlasherDB, +}; +use lmdb::{RwTransaction, Transaction}; +use parking_lot::Mutex; +use slog::{debug, error, info, Logger}; +use std::collections::HashSet; +use std::sync::Arc; +use types::{ + AttesterSlashing, Epoch, EthSpec, IndexedAttestation, ProposerSlashing, SignedBeaconBlockHeader, +}; + +#[derive(Debug)] +pub struct Slasher { + db: SlasherDB, + pub(crate) attestation_queue: AttestationQueue, + pub(crate) block_queue: BlockQueue, + attester_slashings: Mutex>>, + proposer_slashings: Mutex>, + config: Arc, + pub(crate) log: Logger, +} + +impl Slasher { + pub fn open(config: Config, log: Logger) -> Result { + config.validate()?; + let config = Arc::new(config); + let db = SlasherDB::open(config.clone())?; + let attester_slashings = Mutex::new(HashSet::new()); + let proposer_slashings = Mutex::new(HashSet::new()); + let attestation_queue = AttestationQueue::default(); + let block_queue = BlockQueue::default(); + Ok(Self { + db, + attester_slashings, + proposer_slashings, + attestation_queue, + block_queue, + config, + log, + }) + } + + /// Harvest all attester slashings found, removing them from the slasher. + pub fn get_attester_slashings(&self) -> HashSet> { + std::mem::take(&mut self.attester_slashings.lock()) + } + + /// Harvest all proposer slashings found, removing them from the slasher. + pub fn get_proposer_slashings(&self) -> HashSet { + std::mem::take(&mut self.proposer_slashings.lock()) + } + + pub fn config(&self) -> &Config { + &self.config + } + + /// Accept an attestation from the network and queue it for processing. + pub fn accept_attestation(&self, attestation: IndexedAttestation) { + self.attestation_queue.queue(attestation); + } + + /// Accept a block from the network and queue it for processing. + pub fn accept_block_header(&self, block_header: SignedBeaconBlockHeader) { + self.block_queue.queue(block_header); + } + + /// Apply queued blocks and attestations to the on-disk database, and detect slashings! + pub fn process_queued(&self, current_epoch: Epoch) -> Result<(), Error> { + let mut txn = self.db.begin_rw_txn()?; + self.process_blocks(&mut txn)?; + self.process_attestations(current_epoch, &mut txn)?; + txn.commit()?; + Ok(()) + } + + /// Apply queued blocks to the on-disk database. + pub fn process_blocks(&self, txn: &mut RwTransaction<'_>) -> Result<(), Error> { + let blocks = self.block_queue.dequeue(); + let mut slashings = vec![]; + + metrics::set_gauge(&SLASHER_NUM_BLOCKS_PROCESSED, blocks.len() as i64); + + for block in blocks { + if let ProposerSlashingStatus::DoubleVote(slashing) = + self.db.check_or_insert_block_proposal(txn, block)? + { + slashings.push(*slashing); + } + } + + if !slashings.is_empty() { + info!( + self.log, + "Found {} new proposer slashings!", + slashings.len(), + ); + self.proposer_slashings.lock().extend(slashings); + } + + Ok(()) + } + + /// Apply queued attestations to the on-disk database. + pub fn process_attestations( + &self, + current_epoch: Epoch, + txn: &mut RwTransaction<'_>, + ) -> Result<(), Error> { + let snapshot = self.attestation_queue.dequeue(); + + // Filter attestations for relevance. + let (snapshot, deferred, num_dropped) = self.validate(snapshot, current_epoch); + let num_deferred = deferred.len(); + self.attestation_queue.requeue(deferred); + + // Insert attestations into database. + debug!( + self.log, + "Storing attestations in slasher DB"; + "num_valid" => snapshot.len(), + "num_deferred" => num_deferred, + "num_dropped" => num_dropped, + ); + metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_VALID, snapshot.len() as i64); + metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_DEFERRED, num_deferred as i64); + metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_DROPPED, num_dropped as i64); + + for attestation in snapshot.attestations.iter() { + self.db.store_indexed_attestation( + txn, + attestation.1.indexed_attestation_hash, + &attestation.0, + )?; + } + + // Group attestations into batches and process them. + let grouped_attestations = snapshot.group_by_validator_index(&self.config); + for (subqueue_id, subqueue) in grouped_attestations.subqueues.into_iter().enumerate() { + self.process_batch(txn, subqueue_id, subqueue.attestations, current_epoch)?; + } + Ok(()) + } + + /// Process a batch of attestations for a range of validator indices. + fn process_batch( + &self, + txn: &mut RwTransaction<'_>, + subqueue_id: usize, + batch: Vec, AttesterRecord)>>, + current_epoch: Epoch, + ) -> Result<(), Error> { + // First, check for double votes. + for attestation in &batch { + match self.check_double_votes(txn, subqueue_id, &attestation.0, attestation.1) { + Ok(slashings) => { + if !slashings.is_empty() { + info!( + self.log, + "Found {} new double-vote slashings!", + slashings.len() + ); + } + self.attester_slashings.lock().extend(slashings); + } + Err(e) => { + error!( + self.log, + "Error checking for double votes"; + "error" => format!("{:?}", e) + ); + return Err(e); + } + } + } + + // Then check for surrounds using the min-max arrays. + match array::update( + &self.db, + txn, + subqueue_id, + batch, + current_epoch, + &self.config, + ) { + Ok(slashings) => { + if !slashings.is_empty() { + info!( + self.log, + "Found {} new surround slashings!", + slashings.len() + ); + } + self.attester_slashings.lock().extend(slashings); + } + Err(e) => { + error!( + self.log, + "Error processing array update"; + "error" => format!("{:?}", e), + ); + return Err(e); + } + } + + Ok(()) + } + + /// Check for double votes from all validators on `attestation` who match the `subqueue_id`. + fn check_double_votes( + &self, + txn: &mut RwTransaction<'_>, + subqueue_id: usize, + attestation: &IndexedAttestation, + attester_record: AttesterRecord, + ) -> Result>, Error> { + let mut slashings = HashSet::new(); + + for validator_index in self + .config + .attesting_validators_in_chunk(attestation, subqueue_id) + { + let slashing_status = self.db.check_and_update_attester_record( + txn, + validator_index, + &attestation, + attester_record, + )?; + + if let Some(slashing) = slashing_status.into_slashing(attestation) { + debug!( + self.log, + "Found double-vote slashing"; + "validator_index" => validator_index, + "epoch" => slashing.attestation_1.data.target.epoch, + ); + slashings.insert(slashing); + } + } + + Ok(slashings) + } + + /// Validate the attestations in `batch` for ingestion during `current_epoch`. + /// + /// Drop any attestations that are too old to ever be relevant, and return any attestations + /// that might be valid in the future. + /// + /// Returns `(valid, deferred, num_dropped)`. + fn validate( + &self, + batch: AttestationBatch, + current_epoch: Epoch, + ) -> (AttestationBatch, AttestationBatch, usize) { + let mut keep = Vec::with_capacity(batch.len()); + let mut defer = vec![]; + let mut drop_count = 0; + + for tuple in batch.attestations.into_iter() { + let attestation = &tuple.0; + let target_epoch = attestation.data.target.epoch; + let source_epoch = attestation.data.source.epoch; + + if source_epoch > target_epoch + || source_epoch + self.config.history_length as u64 <= current_epoch + { + drop_count += 1; + continue; + } + + // Check that the attestation's target epoch is acceptable, and defer it + // if it's not. + if target_epoch > current_epoch { + defer.push(tuple); + } else { + // Otherwise the attestation is OK to process. + keep.push(tuple); + } + } + + ( + AttestationBatch { attestations: keep }, + AttestationBatch { + attestations: defer, + }, + drop_count, + ) + } + + /// Prune unnecessary attestations and blocks from the on-disk database. + pub fn prune_database(&self, current_epoch: Epoch) -> Result<(), Error> { + self.db.prune(current_epoch) + } +} diff --git a/slasher/src/slasher_server.rs b/slasher/src/slasher_server.rs new file mode 100644 index 000000000..b542a8023 --- /dev/null +++ b/slasher/src/slasher_server.rs @@ -0,0 +1,95 @@ +use crate::metrics::{self, SLASHER_DATABASE_SIZE, SLASHER_RUN_TIME}; +use crate::Slasher; +use directory::size_of_dir; +use slog::{debug, error, info, trace}; +use slot_clock::SlotClock; +use std::sync::mpsc::{sync_channel, TrySendError}; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::stream::StreamExt; +use tokio::time::{interval_at, Duration, Instant}; +use types::EthSpec; + +#[derive(Debug)] +pub struct SlasherServer; + +impl SlasherServer { + pub fn run( + slasher: Arc>, + slot_clock: C, + executor: &TaskExecutor, + ) { + info!(slasher.log, "Starting slasher to detect misbehaviour"); + + // Buffer just a single message in the channel. If the receiver is still processing, we + // don't need to burden them with more work (we can wait). + let (sender, receiver) = sync_channel(1); + let log = slasher.log.clone(); + let update_period = slasher.config().update_period; + + executor.spawn( + async move { + // NOTE: could align each run to some fixed point in each slot, see: + // https://github.com/sigp/lighthouse/issues/1861 + let slot_clock = Arc::new(slot_clock); + let mut interval = interval_at(Instant::now(), Duration::from_secs(update_period)); + while interval.next().await.is_some() { + if let Some(current_slot) = slot_clock.clone().now() { + let current_epoch = current_slot.epoch(E::slots_per_epoch()); + if let Err(TrySendError::Disconnected(_)) = sender.try_send(current_epoch) { + break; + } + } else { + trace!(log, "Slasher has nothing to do: we are pre-genesis"); + } + } + }, + "slasher_server", + ); + + executor.spawn_blocking( + move || { + while let Ok(current_epoch) = receiver.recv() { + let t = Instant::now(); + let num_attestations = slasher.attestation_queue.len(); + let num_blocks = slasher.block_queue.len(); + + let batch_timer = metrics::start_timer(&SLASHER_RUN_TIME); + if let Err(e) = slasher.process_queued(current_epoch) { + error!( + slasher.log, + "Error during scheduled slasher processing"; + "epoch" => current_epoch, + "error" => format!("{:?}", e) + ); + } + drop(batch_timer); + + // Prune the database, even in the case where batch processing failed. + // If the LMDB database is full then pruning could help to free it up. + if let Err(e) = slasher.prune_database(current_epoch) { + error!( + slasher.log, + "Error during slasher database pruning"; + "epoch" => current_epoch, + "error" => format!("{:?}", e), + ); + continue; + } + debug!( + slasher.log, + "Completed slasher update"; + "epoch" => current_epoch, + "time_taken" => format!("{}ms", t.elapsed().as_millis()), + "num_attestations" => num_attestations, + "num_blocks" => num_blocks, + ); + + let database_size = size_of_dir(&slasher.config().database_path); + metrics::set_gauge(&SLASHER_DATABASE_SIZE, database_size as i64); + } + }, + "slasher_server_process_queued", + ); + } +} diff --git a/slasher/src/test_utils.rs b/slasher/src/test_utils.rs new file mode 100644 index 000000000..bd3b06c52 --- /dev/null +++ b/slasher/src/test_utils.rs @@ -0,0 +1,115 @@ +use slog::Logger; +use sloggers::Build; +use std::collections::HashSet; +use std::iter::FromIterator; +use types::{ + AggregateSignature, AttestationData, AttesterSlashing, BeaconBlockHeader, Checkpoint, Epoch, + Hash256, IndexedAttestation, MainnetEthSpec, Signature, SignedBeaconBlockHeader, Slot, +}; + +pub type E = MainnetEthSpec; + +pub fn logger() -> Logger { + if cfg!(feature = "test_logger") { + sloggers::terminal::TerminalLoggerBuilder::new() + .level(sloggers::types::Severity::Trace) + .build() + .unwrap() + } else { + sloggers::null::NullLoggerBuilder.build().unwrap() + } +} + +pub fn indexed_att( + attesting_indices: impl AsRef<[u64]>, + source_epoch: u64, + target_epoch: u64, + target_root: u64, +) -> IndexedAttestation { + IndexedAttestation { + attesting_indices: attesting_indices.as_ref().to_vec().into(), + data: AttestationData { + slot: Slot::new(0), + index: 0, + beacon_block_root: Hash256::zero(), + source: Checkpoint { + epoch: Epoch::new(source_epoch), + root: Hash256::from_low_u64_be(0), + }, + target: Checkpoint { + epoch: Epoch::new(target_epoch), + root: Hash256::from_low_u64_be(target_root), + }, + }, + signature: AggregateSignature::empty(), + } +} + +pub fn att_slashing( + attestation_1: &IndexedAttestation, + attestation_2: &IndexedAttestation, +) -> AttesterSlashing { + AttesterSlashing { + attestation_1: attestation_1.clone(), + attestation_2: attestation_2.clone(), + } +} + +pub fn hashset_intersection( + attestation_1_indices: &[u64], + attestation_2_indices: &[u64], +) -> HashSet { + &HashSet::from_iter(attestation_1_indices.iter().copied()) + & &HashSet::from_iter(attestation_2_indices.iter().copied()) +} + +pub fn slashed_validators_from_slashings(slashings: &HashSet>) -> HashSet { + slashings + .iter() + .flat_map(|slashing| { + let att1 = &slashing.attestation_1; + let att2 = &slashing.attestation_2; + assert!( + att1.is_double_vote(att2) || att1.is_surround_vote(att2), + "invalid slashing: {:#?}", + slashing + ); + hashset_intersection(&att1.attesting_indices, &att2.attesting_indices) + }) + .collect() +} + +pub fn slashed_validators_from_attestations( + attestations: &[IndexedAttestation], +) -> HashSet { + let mut slashed_validators = HashSet::new(); + // O(n^2) code, watch out. + for att1 in attestations { + for att2 in attestations { + if att1 == att2 { + continue; + } + + if att1.is_double_vote(att2) || att1.is_surround_vote(att2) { + slashed_validators.extend(hashset_intersection( + &att1.attesting_indices, + &att2.attesting_indices, + )); + } + } + } + slashed_validators +} + +pub fn block(slot: u64, proposer_index: u64, block_root: u64) -> SignedBeaconBlockHeader { + SignedBeaconBlockHeader { + message: BeaconBlockHeader { + slot: Slot::new(slot), + proposer_index, + parent_root: Hash256::zero(), + state_root: Hash256::zero(), + body_root: Hash256::from_low_u64_be(block_root), + }, + signature: Signature::empty(), + } +} diff --git a/slasher/src/utils.rs b/slasher/src/utils.rs new file mode 100644 index 000000000..b9df9b5b4 --- /dev/null +++ b/slasher/src/utils.rs @@ -0,0 +1,16 @@ +use crate::Error; + +/// Mix-in trait for loading values from LMDB that may or may not exist. +pub trait TxnOptional { + fn optional(self) -> Result, E>; +} + +impl TxnOptional for Result { + fn optional(self) -> Result, Error> { + match self { + Ok(x) => Ok(Some(x)), + Err(lmdb::Error::NotFound) => Ok(None), + Err(e) => Err(e.into()), + } + } +} diff --git a/slasher/tests/attester_slashings.rs b/slasher/tests/attester_slashings.rs new file mode 100644 index 000000000..a0a26a96d --- /dev/null +++ b/slasher/tests/attester_slashings.rs @@ -0,0 +1,215 @@ +use maplit::hashset; +use rayon::prelude::*; +use slasher::{ + config::DEFAULT_CHUNK_SIZE, + test_utils::{att_slashing, indexed_att, logger, slashed_validators_from_slashings, E}, + Config, Slasher, +}; +use std::collections::HashSet; +use tempdir::TempDir; +use types::{AttesterSlashing, Epoch, IndexedAttestation}; + +#[test] +fn double_vote_single_val() { + let v = vec![99]; + let att1 = indexed_att(&v, 0, 1, 0); + let att2 = indexed_att(&v, 0, 1, 1); + let slashings = hashset![att_slashing(&att1, &att2)]; + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &slashings, 1); + slasher_test_indiv(&attestations, &slashings, 1000); +} + +#[test] +fn double_vote_multi_vals() { + let v = vec![0, 1, 2]; + let att1 = indexed_att(&v, 0, 1, 0); + let att2 = indexed_att(&v, 0, 1, 1); + let slashings = hashset![att_slashing(&att1, &att2)]; + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &slashings, 1); + slasher_test_indiv(&attestations, &slashings, 1000); +} + +// A subset of validators double vote. +#[test] +fn double_vote_some_vals() { + let v1 = vec![0, 1, 2, 3, 4, 5, 6]; + let v2 = vec![0, 2, 4, 6]; + let att1 = indexed_att(&v1, 0, 1, 0); + let att2 = indexed_att(&v2, 0, 1, 1); + let slashings = hashset![att_slashing(&att1, &att2)]; + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &slashings, 1); + slasher_test_indiv(&attestations, &slashings, 1000); +} + +// A subset of validators double vote, others vote twice for the same thing. +#[test] +fn double_vote_some_vals_repeat() { + let v1 = vec![0, 1, 2, 3, 4, 5, 6]; + let v2 = vec![0, 2, 4, 6]; + let v3 = vec![1, 3, 5]; + let att1 = indexed_att(&v1, 0, 1, 0); + let att2 = indexed_att(&v2, 0, 1, 1); + let att3 = indexed_att(&v3, 0, 1, 0); + let slashings = hashset![att_slashing(&att1, &att2)]; + let attestations = vec![att1, att2, att3]; + slasher_test_indiv(&attestations, &slashings, 1); + slasher_test_indiv(&attestations, &slashings, 1000); +} + +// Nobody double votes, nobody gets slashed. +#[test] +fn no_double_vote_same_target() { + let v1 = vec![0, 1, 2, 3, 4, 5, 6]; + let v2 = vec![0, 1, 2, 3, 4, 5, 7, 8]; + let att1 = indexed_att(&v1, 0, 1, 0); + let att2 = indexed_att(&v2, 0, 1, 0); + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &hashset! {}, 1); + slasher_test_indiv(&attestations, &hashset! {}, 1000); +} + +// Two groups votes for different things, no slashings. +#[test] +fn no_double_vote_distinct_vals() { + let v1 = vec![0, 1, 2, 3]; + let v2 = vec![4, 5, 6, 7]; + let att1 = indexed_att(&v1, 0, 1, 0); + let att2 = indexed_att(&v2, 0, 1, 1); + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &hashset! {}, 1); + slasher_test_indiv(&attestations, &hashset! {}, 1000); +} + +#[test] +fn no_double_vote_repeated() { + let v = vec![0, 1, 2, 3, 4]; + let att1 = indexed_att(&v, 0, 1, 0); + let att2 = att1.clone(); + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &hashset! {}, 1); + slasher_test_batch(&attestations, &hashset! {}, 1); + parallel_slasher_test(&attestations, hashset! {}, 1); +} + +#[test] +fn surrounds_existing_single_val_single_chunk() { + let v = vec![0]; + let att1 = indexed_att(&v, 1, 2, 0); + let att2 = indexed_att(&v, 0, 3, 0); + let slashings = hashset![att_slashing(&att2, &att1)]; + slasher_test_indiv(&[att1, att2], &slashings, 3); +} + +#[test] +fn surrounds_existing_multi_vals_single_chunk() { + let validators = vec![0, 16, 1024, 300_000, 300_001]; + let att1 = indexed_att(validators.clone(), 1, 2, 0); + let att2 = indexed_att(validators.clone(), 0, 3, 0); + let slashings = hashset![att_slashing(&att2, &att1)]; + slasher_test_indiv(&[att1, att2], &slashings, 3); +} + +#[test] +fn surrounds_existing_many_chunks() { + let v = vec![0]; + let chunk_size = DEFAULT_CHUNK_SIZE as u64; + let att1 = indexed_att(&v, 3 * chunk_size, 3 * chunk_size + 1, 0); + let att2 = indexed_att(&v, 0, 3 * chunk_size + 2, 0); + let slashings = hashset![att_slashing(&att2, &att1)]; + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &slashings, 4 * chunk_size); +} + +#[test] +fn surrounded_by_single_val_single_chunk() { + let v = vec![0]; + let att1 = indexed_att(&v, 0, 15, 0); + let att2 = indexed_att(&v, 1, 14, 0); + let slashings = hashset![att_slashing(&att1, &att2)]; + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &slashings, 15); +} + +#[test] +fn surrounded_by_single_val_multi_chunk() { + let v = vec![0]; + let chunk_size = DEFAULT_CHUNK_SIZE as u64; + let att1 = indexed_att(&v, 0, 3 * chunk_size, 0); + let att2 = indexed_att(&v, chunk_size, chunk_size + 1, 0); + let slashings = hashset![att_slashing(&att1, &att2)]; + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &slashings, 3 * chunk_size); + slasher_test_indiv(&attestations, &slashings, 4 * chunk_size); +} + +// Process each attestation individually, and confirm that the slashings produced are as expected. +fn slasher_test_indiv( + attestations: &[IndexedAttestation], + expected: &HashSet>, + current_epoch: u64, +) { + slasher_test(attestations, expected, current_epoch, |_| true); +} + +// Process all attestations in one batch. +fn slasher_test_batch( + attestations: &[IndexedAttestation], + expected: &HashSet>, + current_epoch: u64, +) { + slasher_test(attestations, expected, current_epoch, |_| false); +} + +fn slasher_test( + attestations: &[IndexedAttestation], + expected: &HashSet>, + current_epoch: u64, + should_process_after: impl Fn(usize) -> bool, +) { + let tempdir = TempDir::new("slasher").unwrap(); + let config = Config::new(tempdir.path().into()); + let slasher = Slasher::open(config, logger()).unwrap(); + let current_epoch = Epoch::new(current_epoch); + + for (i, attestation) in attestations.iter().enumerate() { + slasher.accept_attestation(attestation.clone()); + + if should_process_after(i) { + slasher.process_queued(current_epoch).unwrap(); + } + } + slasher.process_queued(current_epoch).unwrap(); + + let slashings = slasher.get_attester_slashings(); + + assert_eq!(&slashings, expected); + + // Pruning should not error. + slasher.prune_database(current_epoch).unwrap(); +} + +fn parallel_slasher_test( + attestations: &[IndexedAttestation], + expected_slashed_validators: HashSet, + current_epoch: u64, +) { + let tempdir = TempDir::new("slasher").unwrap(); + let config = Config::new(tempdir.path().into()); + let slasher = Slasher::open(config, logger()).unwrap(); + let current_epoch = Epoch::new(current_epoch); + + attestations + .into_par_iter() + .try_for_each(|attestation| { + slasher.accept_attestation(attestation.clone()); + slasher.process_queued(current_epoch) + }) + .expect("parallel processing shouldn't race"); + + let slashings = slasher.get_attester_slashings(); + let slashed_validators = slashed_validators_from_slashings(&slashings); + assert_eq!(slashed_validators, expected_slashed_validators); +} diff --git a/slasher/tests/proposer_slashings.rs b/slasher/tests/proposer_slashings.rs new file mode 100644 index 000000000..2f303ad35 --- /dev/null +++ b/slasher/tests/proposer_slashings.rs @@ -0,0 +1,61 @@ +use slasher::{ + test_utils::{block as test_block, logger, E}, + Config, Slasher, +}; +use tempdir::TempDir; +use types::{Epoch, EthSpec}; + +#[test] +fn empty_pruning() { + let tempdir = TempDir::new("slasher").unwrap(); + let config = Config::new(tempdir.path().into()); + let slasher = Slasher::::open(config.clone(), logger()).unwrap(); + slasher.prune_database(Epoch::new(0)).unwrap(); +} + +#[test] +fn block_pruning() { + let slots_per_epoch = E::slots_per_epoch(); + + let tempdir = TempDir::new("slasher").unwrap(); + let mut config = Config::new(tempdir.path().into()); + config.chunk_size = 2; + config.history_length = 2; + + let slasher = Slasher::::open(config.clone(), logger()).unwrap(); + let current_epoch = Epoch::from(2 * config.history_length); + + // Pruning the empty database should be safe. + slasher.prune_database(Epoch::new(0)).unwrap(); + slasher.prune_database(current_epoch).unwrap(); + + // Add blocks in excess of the history length and prune them away. + let proposer_index = 100_000; // high to check sorting by slot + for slot in 1..=current_epoch.as_u64() * slots_per_epoch { + slasher.accept_block_header(test_block(slot, proposer_index, 0)); + } + slasher.process_queued(current_epoch).unwrap(); + slasher.prune_database(current_epoch).unwrap(); + + // Add more conflicting blocks, and check that only the ones within the non-pruned + // section are detected as slashable. + for slot in 1..=current_epoch.as_u64() * slots_per_epoch { + slasher.accept_block_header(test_block(slot, proposer_index, 1)); + } + slasher.process_queued(current_epoch).unwrap(); + + let proposer_slashings = slasher.get_proposer_slashings(); + + // Check number of proposer slashings, accounting for single block in current epoch. + assert_eq!( + proposer_slashings.len(), + (config.history_length - 1) * slots_per_epoch as usize + 1 + ); + // Check epochs of all slashings are from within range. + assert!(proposer_slashings.iter().all(|slashing| slashing + .signed_header_1 + .message + .slot + .epoch(slots_per_epoch) + > current_epoch - config.history_length as u64)); +} diff --git a/slasher/tests/random.rs b/slasher/tests/random.rs new file mode 100644 index 000000000..251cbaa68 --- /dev/null +++ b/slasher/tests/random.rs @@ -0,0 +1,233 @@ +use rand::prelude::*; +use rand::{rngs::StdRng, thread_rng, Rng, SeedableRng}; +use slasher::{ + test_utils::{ + block, indexed_att, logger, slashed_validators_from_attestations, + slashed_validators_from_slashings, E, + }, + Config, Slasher, +}; +use std::cmp::max; +use tempdir::TempDir; +use types::{Epoch, EthSpec}; + +#[derive(Debug)] +struct TestConfig { + num_validators: usize, + max_attestations: usize, + check_slashings: bool, + add_blocks: bool, +} + +impl Default for TestConfig { + fn default() -> Self { + Self { + num_validators: 4, + max_attestations: 50, + check_slashings: false, + add_blocks: false, + } + } +} + +fn random_test(seed: u64, test_config: TestConfig) { + let check_slashings = test_config.check_slashings; + let num_validators = test_config.num_validators; + let max_attestations = test_config.max_attestations; + + println!("Running with seed {}", seed); + let mut rng = StdRng::seed_from_u64(seed); + + let tempdir = TempDir::new("slasher").unwrap(); + + let mut config = Config::new(tempdir.path().into()); + config.validator_chunk_size = 1 << rng.gen_range(1, 4); + + let chunk_size_exponent = rng.gen_range(1, 4); + config.chunk_size = 1 << chunk_size_exponent; + config.history_length = 1 << rng.gen_range(chunk_size_exponent, chunk_size_exponent + 3); + + let slasher = Slasher::::open(config.clone(), logger()).unwrap(); + + let validators = (0..num_validators as u64).collect::>(); + + let num_attestations = rng.gen_range(2, max_attestations + 1); + + let mut current_epoch = Epoch::new(0); + let mut attestations = vec![]; + + for _ in 0..num_attestations { + let num_attesters = rng.gen_range(1, num_validators); + let mut attesting_indices = validators + .choose_multiple(&mut rng, num_attesters) + .copied() + .collect::>(); + attesting_indices.sort(); + + // If checking slashings, generate valid attestations in range. + let (source, target) = if check_slashings { + let source = rng.gen_range( + current_epoch + .as_u64() + .saturating_sub(config.history_length as u64 - 1), + current_epoch.as_u64() + 1, + ); + let target = rng.gen_range(source, current_epoch.as_u64() + 1); + (source, target) + } else { + let source = rng.gen_range(0, max(3 * current_epoch.as_u64(), 1)); + let target = rng.gen_range(source, max(3 * current_epoch.as_u64(), source + 1)); + (source, target) + }; + let target_root = rng.gen_range(0, 3); + let attestation = indexed_att(&attesting_indices, source, target, target_root); + + if check_slashings { + attestations.push(attestation.clone()); + } + + // Supply to slasher + slasher.accept_attestation(attestation); + + // Maybe add a random block too + if test_config.add_blocks && rng.gen_bool(0.1) { + let slot = rng.gen_range(0, 1 + 3 * current_epoch.as_u64() * E::slots_per_epoch() / 2); + let proposer = rng.gen_range(0, num_validators as u64); + let block_root = rng.gen_range(0, 2); + slasher.accept_block_header(block(slot, proposer, block_root)); + } + + // Maybe process + if rng.gen_bool(0.1) { + slasher.process_queued(current_epoch).unwrap(); + + // Maybe prune + if rng.gen_bool(0.1) { + slasher.prune_database(current_epoch).unwrap(); + } + } + + // Maybe advance to the next epoch + if rng.gen_bool(0.5) { + if check_slashings { + slasher.process_queued(current_epoch).unwrap(); + } + current_epoch += 1; + } + } + + if !check_slashings { + return; + } + + slasher.process_queued(current_epoch).unwrap(); + + let slashings = slasher.get_attester_slashings(); + + let slashed_validators = slashed_validators_from_slashings(&slashings); + let expected_slashed_validators = slashed_validators_from_attestations(&attestations); + assert_eq!(slashed_validators, expected_slashed_validators); +} + +// Fuzz-like test that runs forever on different seeds looking for crashes. +#[test] +#[ignore] +fn no_crash() { + let mut rng = thread_rng(); + loop { + random_test(rng.gen(), TestConfig::default()); + } +} + +// Fuzz-like test that runs forever on different seeds looking for crashes. +#[test] +#[ignore] +fn no_crash_with_blocks() { + let mut rng = thread_rng(); + loop { + random_test( + rng.gen(), + TestConfig { + add_blocks: true, + ..TestConfig::default() + }, + ); + } +} + +// Fuzz-like test that runs forever on different seeds looking for missed slashings. +#[test] +#[ignore] +fn check_slashings() { + let mut rng = thread_rng(); + loop { + random_test( + rng.gen(), + TestConfig { + check_slashings: true, + ..TestConfig::default() + }, + ); + } +} + +#[test] +fn check_slashings_example1() { + random_test( + 1, + TestConfig { + check_slashings: true, + ..TestConfig::default() + }, + ); +} + +#[test] +fn check_slashings_example2() { + random_test( + 2, + TestConfig { + check_slashings: true, + max_attestations: 3, + ..TestConfig::default() + }, + ); +} + +#[test] +fn check_slashings_example3() { + random_test( + 3, + TestConfig { + check_slashings: true, + max_attestations: 100, + ..TestConfig::default() + }, + ); +} + +#[test] +fn no_crash_example1() { + random_test(1, TestConfig::default()); +} + +#[test] +fn no_crash_example2() { + random_test(2, TestConfig::default()); +} + +#[test] +fn no_crash_example3() { + random_test(3, TestConfig::default()); +} + +#[test] +fn no_crash_blocks_example1() { + random_test( + 1, + TestConfig { + add_blocks: true, + ..TestConfig::default() + }, + ); +} diff --git a/slasher/tests/wrap_around.rs b/slasher/tests/wrap_around.rs new file mode 100644 index 000000000..0ed3860dc --- /dev/null +++ b/slasher/tests/wrap_around.rs @@ -0,0 +1,39 @@ +use slasher::{ + test_utils::{indexed_att, logger}, + Config, Slasher, +}; +use tempdir::TempDir; +use types::Epoch; + +#[test] +fn attestation_pruning_empty_wrap_around() { + let tempdir = TempDir::new("slasher").unwrap(); + let mut config = Config::new(tempdir.path().into()); + config.validator_chunk_size = 1; + config.chunk_size = 16; + config.history_length = 16; + + let slasher = Slasher::open(config.clone(), logger()).unwrap(); + + let v = vec![0]; + let history_length = config.history_length as u64; + + let mut current_epoch = Epoch::new(history_length - 1); + + slasher.accept_attestation(indexed_att(v.clone(), 0, history_length - 1, 0)); + slasher.process_queued(current_epoch).unwrap(); + slasher.prune_database(current_epoch).unwrap(); + + // Delete the previous attestation + current_epoch = Epoch::new(2 * history_length + 2); + slasher.prune_database(current_epoch).unwrap(); + + // Add an attestation that would be surrounded with the modulo considered + slasher.accept_attestation(indexed_att( + v.clone(), + 2 * history_length - 3, + 2 * history_length - 2, + 1, + )); + slasher.process_queued(current_epoch).unwrap(); +}