De-duplicate attestations in the slasher (#2767)

## Issue Addressed

Closes https://github.com/sigp/lighthouse/issues/2112
Closes https://github.com/sigp/lighthouse/issues/1861

## Proposed Changes

Collect attestations by validator index in the slasher, and use the magic of reference counting to automatically discard redundant attestations. This results in us storing only 1-2% of the attestations observed when subscribed to all subnets, which carries over to a 50-100x reduction in data stored 🎉 

## Additional Info

There's some nuance to the configuration of the `slot-offset`. It has a profound effect on the effictiveness of de-duplication, see the docs added to the book for an explanation: 5442e695e5/book/src/slasher.md (slot-offset)
This commit is contained in:
Michael Sproul 2021-11-08 00:01:09 +00:00
parent fadb8b2b2b
commit df02639b71
13 changed files with 252 additions and 93 deletions

View File

@ -451,6 +451,18 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.requires("slasher")
.takes_value(true)
)
.arg(
Arg::with_name("slasher-slot-offset")
.long("slasher-slot-offset")
.help(
"Set the delay from the start of the slot at which the slasher should ingest \
attestations. Only effective if the slasher-update-period is a multiple of the \
slot duration."
)
.value_name("SECONDS")
.requires("slasher")
.takes_value(true)
)
.arg(
Arg::with_name("slasher-history-length")
.long("slasher-history-length")

View File

@ -448,6 +448,19 @@ pub fn get_config<E: EthSpec>(
slasher_config.update_period = update_period;
}
if let Some(slot_offset) =
clap_utils::parse_optional::<f64>(cli_args, "slasher-slot-offset")?
{
if slot_offset.is_finite() {
slasher_config.slot_offset = slot_offset;
} else {
return Err(format!(
"invalid float for slasher-slot-offset: {}",
slot_offset
));
}
}
if let Some(history_length) =
clap_utils::parse_optional(cli_args, "slasher-history-length")?
{

View File

@ -102,6 +102,31 @@ If the `time_taken` is substantially longer than the update period then it indic
struggling under the load, and you should consider increasing the update period or lowering the
resource requirements by tweaking the history length.
The update period should almost always be set to a multiple of the slot duration (12
seconds), or in rare cases a divisor (e.g. 4 seconds).
### Slot Offset
* Flag: `--slasher-slot-offset SECONDS`
* Argument: number of seconds (decimal allowed)
* Default: 10.5 seconds
Set the offset from the start of the slot at which slasher processing should run. The default
value of 10.5 seconds is chosen so that de-duplication can be maximally effective. The slasher
will de-duplicate attestations from the same batch by storing only the attestations necessary
to cover all seen validators. In other words, it will store aggregated attestations rather than
unaggregated attestations if given the opportunity.
Aggregated attestations are published 8 seconds into the slot, so the default allows 2.5 seconds for
them to arrive, and 1.5 seconds for them to be processed before a potential block proposal at the
start of the next slot. If the batch processing time on your machine is significantly longer than
1.5 seconds then you may want to lengthen the update period to 24 seconds, or decrease the slot
offset to a value in the range 8.5-10.5s (lower values may result in more data being stored).
The slasher will run every `update-period` seconds after the first `slot_start + slot-offset`, which
means the `slot-offset` will be ineffective if the `update-period` is not a multiple (or divisor) of
the slot duration.
### Chunk Size and Validator Chunk Size
* Flags: `--slasher-chunk-size EPOCHS`, `--slasher-validator-chunk-size NUM_VALIDATORS`

View File

@ -795,6 +795,25 @@ fn slasher_update_period_flag() {
});
}
#[test]
fn slasher_slot_offset() {
// TODO: check that the offset is actually stored, once the config is un-hacked
// See: https://github.com/sigp/lighthouse/pull/2767#discussion_r741610402
CommandLineTest::new()
.flag("slasher", None)
.flag("slasher-max-db-size", Some("16"))
.flag("slasher-slot-offset", Some("11.25"))
.run();
}
#[test]
#[should_panic]
fn slasher_slot_offset_nan() {
CommandLineTest::new()
.flag("slasher", None)
.flag("slasher-max-db-size", Some("16"))
.flag("slasher-slot-offset", Some("NaN"))
.run();
}
#[test]
fn slasher_history_length_flag() {
CommandLineTest::new()
.flag("slasher", None)

View File

@ -55,11 +55,18 @@ impl<T: BeaconChainTypes> SlasherService<T> {
// don't need to burden them with more work (we can wait).
let (notif_sender, notif_receiver) = sync_channel(1);
let update_period = slasher.config().update_period;
let slot_offset = slasher.config().slot_offset;
let beacon_chain = self.beacon_chain.clone();
let network_sender = self.network_sender.clone();
executor.spawn(
Self::run_notifier(beacon_chain.clone(), update_period, notif_sender, log),
Self::run_notifier(
beacon_chain.clone(),
update_period,
slot_offset,
notif_sender,
log,
),
"slasher_server_notifier",
);
@ -75,12 +82,19 @@ impl<T: BeaconChainTypes> SlasherService<T> {
async fn run_notifier(
beacon_chain: Arc<BeaconChain<T>>,
update_period: u64,
slot_offset: f64,
notif_sender: SyncSender<Epoch>,
log: Logger,
) {
// NOTE: could align each run to some fixed point in each slot, see:
// https://github.com/sigp/lighthouse/issues/1861
let mut interval = interval_at(Instant::now(), Duration::from_secs(update_period));
let slot_offset = Duration::from_secs_f64(slot_offset);
let start_instant =
if let Some(duration_to_next_slot) = beacon_chain.slot_clock.duration_to_next_slot() {
Instant::now() + duration_to_next_slot + slot_offset
} else {
error!(log, "Error aligning slasher to slot clock");
Instant::now()
};
let mut interval = interval_at(start_instant, Duration::from_secs(update_period));
loop {
interval.tick().await;

View File

@ -1,5 +1,5 @@
use crate::metrics::{self, SLASHER_COMPRESSION_RATIO, SLASHER_NUM_CHUNKS_UPDATED};
use crate::{AttesterRecord, AttesterSlashingStatus, Config, Error, SlasherDB};
use crate::{AttesterSlashingStatus, Config, Error, IndexedAttesterRecord, SlasherDB};
use flate2::bufread::{ZlibDecoder, ZlibEncoder};
use lmdb::{RwTransaction, Transaction};
use serde_derive::{Deserialize, Serialize};
@ -486,7 +486,7 @@ pub fn update<E: EthSpec>(
db: &SlasherDB<E>,
txn: &mut RwTransaction<'_>,
validator_chunk_index: usize,
batch: Vec<Arc<(IndexedAttestation<E>, AttesterRecord)>>,
batch: Vec<Arc<IndexedAttesterRecord<E>>>,
current_epoch: Epoch,
config: &Config,
) -> Result<HashSet<AttesterSlashing<E>>, Error> {
@ -496,7 +496,7 @@ pub fn update<E: EthSpec>(
let mut chunk_attestations = BTreeMap::new();
for attestation in batch {
chunk_attestations
.entry(config.chunk_index(attestation.0.data.source.epoch))
.entry(config.chunk_index(attestation.indexed.data.source.epoch))
.or_insert_with(Vec::new)
.push(attestation);
}
@ -573,7 +573,7 @@ pub fn update_array<E: EthSpec, T: TargetArrayChunk>(
db: &SlasherDB<E>,
txn: &mut RwTransaction<'_>,
validator_chunk_index: usize,
chunk_attestations: &BTreeMap<usize, Vec<Arc<(IndexedAttestation<E>, AttesterRecord)>>>,
chunk_attestations: &BTreeMap<usize, Vec<Arc<IndexedAttesterRecord<E>>>>,
current_epoch: Epoch,
config: &Config,
) -> Result<HashSet<AttesterSlashing<E>>, Error> {
@ -597,7 +597,7 @@ pub fn update_array<E: EthSpec, T: TargetArrayChunk>(
for attestations in chunk_attestations.values() {
for attestation in attestations {
for validator_index in
config.attesting_validators_in_chunk(&attestation.0, validator_chunk_index)
config.attesting_validators_in_chunk(&attestation.indexed, validator_chunk_index)
{
let slashing_status = apply_attestation_for_validator::<E, T>(
db,
@ -605,11 +605,11 @@ pub fn update_array<E: EthSpec, T: TargetArrayChunk>(
&mut updated_chunks,
validator_chunk_index,
validator_index,
&attestation.0,
&attestation.indexed,
current_epoch,
config,
)?;
if let Some(slashing) = slashing_status.into_slashing(&attestation.0) {
if let Some(slashing) = slashing_status.into_slashing(&attestation.indexed) {
slashings.insert(slashing);
}
}

View File

@ -1,64 +1,85 @@
use crate::{AttesterRecord, Config};
use crate::{AttesterRecord, Config, IndexedAttesterRecord};
use parking_lot::Mutex;
use std::collections::BTreeSet;
use std::sync::Arc;
use types::{EthSpec, IndexedAttestation};
use std::collections::BTreeMap;
use std::sync::{Arc, Weak};
use types::{EthSpec, Hash256, IndexedAttestation};
/// Staging area for attestations received from the network.
///
/// To be added to the database in batches, for efficiency and to prevent data races.
/// Attestations are not grouped by validator index at this stage so that they can be easily
/// filtered for timeliness.
#[derive(Debug, Default)]
pub struct AttestationQueue<E: EthSpec> {
/// All attestations (unique) for storage on disk.
pub queue: Mutex<AttestationBatch<E>>,
pub queue: Mutex<SimpleBatch<E>>,
}
pub type SimpleBatch<E> = Vec<Arc<IndexedAttesterRecord<E>>>;
/// Attestations dequeued from the queue and in preparation for processing.
///
/// This struct is responsible for mapping validator indices to attestations and performing
/// de-duplication to remove redundant attestations.
#[derive(Debug, Default)]
pub struct AttestationBatch<E: EthSpec> {
/// Map from (`validator_index`, `attestation_data_hash`) to indexed attester record.
///
/// This mapping is used for de-duplication, see:
///
/// https://github.com/sigp/lighthouse/issues/2112
pub attesters: BTreeMap<(u64, Hash256), Arc<IndexedAttesterRecord<E>>>,
/// Vec of all unique indexed attester records.
///
/// The weak references account for the fact that some records might prove useless after
/// de-duplication.
pub attestations: Vec<Weak<IndexedAttesterRecord<E>>>,
}
/// Attestations grouped by validator index range.
#[derive(Debug)]
pub struct GroupedAttestations<E: EthSpec> {
pub subqueues: Vec<AttestationBatch<E>>,
}
/// A queue of attestations for a range of validator indices.
#[derive(Debug, Default)]
pub struct AttestationBatch<E: EthSpec> {
pub attestations: Vec<Arc<(IndexedAttestation<E>, AttesterRecord)>>,
pub subqueues: Vec<SimpleBatch<E>>,
}
impl<E: EthSpec> AttestationBatch<E> {
pub fn len(&self) -> usize {
self.attestations.len()
/// Add an attestation to the queue.
pub fn queue(&mut self, indexed_record: Arc<IndexedAttesterRecord<E>>) {
self.attestations.push(Arc::downgrade(&indexed_record));
let attestation_data_hash = indexed_record.record.attestation_data_hash;
for &validator_index in &indexed_record.indexed.attesting_indices {
self.attesters
.entry((validator_index, attestation_data_hash))
.and_modify(|existing_entry| {
// If the new record is for the same attestation data but with more bits set
// then replace the existing record so that we might avoid storing the
// smaller indexed attestation. Single-bit attestations will usually be removed
// completely by this process, and aggregates will only be retained if they
// are not redundant with respect to a larger aggregate seen in the same batch.
if existing_entry.indexed.attesting_indices.len()
< indexed_record.indexed.attesting_indices.len()
{
*existing_entry = indexed_record.clone();
}
})
.or_insert_with(|| indexed_record.clone());
}
}
pub fn is_empty(&self) -> bool {
self.attestations.is_empty()
}
/// Group the attestations by validator index.
pub fn group_by_validator_index(self, config: &Config) -> GroupedAttestations<E> {
/// Group the attestations by validator chunk index.
pub fn group_by_validator_chunk_index(self, config: &Config) -> GroupedAttestations<E> {
let mut grouped_attestations = GroupedAttestations { subqueues: vec![] };
for attestation in self.attestations {
let subqueue_ids = attestation
.0
.attesting_indices
.iter()
.map(|validator_index| config.validator_chunk_index(*validator_index))
.collect::<BTreeSet<_>>();
for ((validator_index, _), indexed_record) in self.attesters {
let subqueue_id = config.validator_chunk_index(validator_index);
if let Some(max_subqueue_id) = subqueue_ids.iter().next_back() {
if *max_subqueue_id >= grouped_attestations.subqueues.len() {
grouped_attestations
.subqueues
.resize_with(max_subqueue_id + 1, AttestationBatch::default);
}
if subqueue_id >= grouped_attestations.subqueues.len() {
grouped_attestations
.subqueues
.resize_with(subqueue_id + 1, SimpleBatch::default);
}
for subqueue_id in subqueue_ids {
grouped_attestations.subqueues[subqueue_id]
.attestations
.push(attestation.clone());
}
grouped_attestations.subqueues[subqueue_id].push(indexed_record);
}
grouped_attestations
@ -66,21 +87,18 @@ impl<E: EthSpec> AttestationBatch<E> {
}
impl<E: EthSpec> AttestationQueue<E> {
/// Add an attestation to the queue.
pub fn queue(&self, attestation: IndexedAttestation<E>) {
let attester_record = AttesterRecord::from(attestation.clone());
self.queue
.lock()
.attestations
.push(Arc::new((attestation, attester_record)));
let indexed_record = IndexedAttesterRecord::new(attestation, attester_record);
self.queue.lock().push(indexed_record);
}
pub fn dequeue(&self) -> AttestationBatch<E> {
pub fn dequeue(&self) -> SimpleBatch<E> {
std::mem::take(&mut self.queue.lock())
}
pub fn requeue(&self, batch: AttestationBatch<E>) {
self.queue.lock().attestations.extend(batch.attestations);
pub fn requeue(&self, batch: SimpleBatch<E>) {
self.queue.lock().extend(batch);
}
pub fn len(&self) -> usize {

View File

@ -1,4 +1,5 @@
use ssz_derive::{Decode, Encode};
use std::sync::Arc;
use tree_hash::TreeHash as _;
use tree_hash_derive::TreeHash;
use types::{AggregateSignature, EthSpec, Hash256, IndexedAttestation, VariableList};
@ -11,6 +12,21 @@ pub struct AttesterRecord {
pub indexed_attestation_hash: Hash256,
}
/// Bundling of an `IndexedAttestation` with an `AttesterRecord`.
///
/// This struct gets `Arc`d and passed around between each stage of queueing and processing.
#[derive(Debug)]
pub struct IndexedAttesterRecord<E: EthSpec> {
pub indexed: IndexedAttestation<E>,
pub record: AttesterRecord,
}
impl<E: EthSpec> IndexedAttesterRecord<E> {
pub fn new(indexed: IndexedAttestation<E>, record: AttesterRecord) -> Arc<Self> {
Arc::new(IndexedAttesterRecord { indexed, record })
}
}
#[derive(Debug, Clone, Encode, Decode, TreeHash)]
struct IndexedAttestationHeader<T: EthSpec> {
pub attesting_indices: VariableList<u64, T::MaxValidatorsPerCommittee>,

View File

@ -7,6 +7,7 @@ pub const DEFAULT_CHUNK_SIZE: usize = 16;
pub const DEFAULT_VALIDATOR_CHUNK_SIZE: usize = 256;
pub const DEFAULT_HISTORY_LENGTH: usize = 4096;
pub const DEFAULT_UPDATE_PERIOD: u64 = 12;
pub const DEFAULT_SLOT_OFFSET: f64 = 10.5;
pub const DEFAULT_MAX_DB_SIZE: usize = 256 * 1024; // 256 GiB
pub const DEFAULT_BROADCAST: bool = false;
@ -26,12 +27,19 @@ pub struct Config {
pub history_length: usize,
/// Update frequency in seconds.
pub update_period: u64,
/// Offset from the start of the slot to begin processing.
#[serde(skip, default = "default_slot_offset")]
pub slot_offset: f64,
/// Maximum size of the LMDB database in megabytes.
pub max_db_size_mbs: usize,
/// Whether to broadcast slashings found to the network.
pub broadcast: bool,
}
fn default_slot_offset() -> f64 {
DEFAULT_SLOT_OFFSET
}
impl Config {
pub fn new(database_path: PathBuf) -> Self {
Self {
@ -40,6 +48,7 @@ impl Config {
validator_chunk_size: DEFAULT_VALIDATOR_CHUNK_SIZE,
history_length: DEFAULT_HISTORY_LENGTH,
update_period: DEFAULT_UPDATE_PERIOD,
slot_offset: DEFAULT_SLOT_OFFSET,
max_db_size_mbs: DEFAULT_MAX_DB_SIZE,
broadcast: DEFAULT_BROADCAST,
}

View File

@ -15,8 +15,8 @@ pub mod test_utils;
mod utils;
pub use crate::slasher::Slasher;
pub use attestation_queue::{AttestationBatch, AttestationQueue};
pub use attester_record::AttesterRecord;
pub use attestation_queue::{AttestationBatch, AttestationQueue, SimpleBatch};
pub use attester_record::{AttesterRecord, IndexedAttesterRecord};
pub use block_queue::BlockQueue;
pub use config::Config;
pub use database::SlasherDB;

View File

@ -22,6 +22,11 @@ lazy_static! {
"slasher_num_attestations_valid",
"Number of valid attestations per batch"
);
pub static ref SLASHER_NUM_ATTESTATIONS_STORED_PER_BATCH: Result<IntGauge> =
try_create_int_gauge(
"slasher_num_attestations_stored_per_batch",
"Number of attestations stored per batch"
);
pub static ref SLASHER_NUM_BLOCKS_PROCESSED: Result<IntGauge> = try_create_int_gauge(
"slasher_num_blocks_processed",
"Number of blocks processed per batch",

View File

@ -1,4 +1,8 @@
use crate::{database::CURRENT_SCHEMA_VERSION, Config, Error, SlasherDB};
use crate::{
config::{DEFAULT_BROADCAST, DEFAULT_SLOT_OFFSET},
database::CURRENT_SCHEMA_VERSION,
Config, Error, SlasherDB,
};
use lmdb::RwTransaction;
use serde_derive::{Deserialize, Serialize};
use std::path::PathBuf;
@ -25,8 +29,9 @@ impl Into<ConfigV2> for ConfigV1 {
validator_chunk_size: self.validator_chunk_size,
history_length: self.history_length,
update_period: self.update_period,
slot_offset: DEFAULT_SLOT_OFFSET,
max_db_size_mbs: self.max_db_size_mbs,
broadcast: false,
broadcast: DEFAULT_BROADCAST,
}
}
}

View File

@ -1,11 +1,12 @@
use crate::batch_stats::{AttestationStats, BatchStats, BlockStats};
use crate::metrics::{
self, SLASHER_NUM_ATTESTATIONS_DEFERRED, SLASHER_NUM_ATTESTATIONS_DROPPED,
SLASHER_NUM_ATTESTATIONS_VALID, SLASHER_NUM_BLOCKS_PROCESSED,
SLASHER_NUM_ATTESTATIONS_STORED_PER_BATCH, SLASHER_NUM_ATTESTATIONS_VALID,
SLASHER_NUM_BLOCKS_PROCESSED,
};
use crate::{
array, AttestationBatch, AttestationQueue, AttesterRecord, BlockQueue, Config, Error,
ProposerSlashingStatus, SlasherDB,
ProposerSlashingStatus, SimpleBatch, SlasherDB,
};
use lmdb::{RwTransaction, Transaction};
use parking_lot::Mutex;
@ -132,33 +133,56 @@ impl<E: EthSpec> Slasher<E> {
// Filter attestations for relevance.
let (snapshot, deferred, num_dropped) = self.validate(snapshot, current_epoch);
let num_valid = snapshot.len();
let num_deferred = deferred.len();
self.attestation_queue.requeue(deferred);
// Insert attestations into database.
debug!(
self.log,
"Storing attestations in slasher DB";
"num_valid" => snapshot.len(),
"Pre-processing attestations for slasher";
"num_valid" => num_valid,
"num_deferred" => num_deferred,
"num_dropped" => num_dropped,
);
metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_VALID, snapshot.len() as i64);
metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_VALID, num_valid as i64);
metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_DEFERRED, num_deferred as i64);
metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_DROPPED, num_dropped as i64);
for attestation in snapshot.attestations.iter() {
self.db.store_indexed_attestation(
txn,
attestation.1.indexed_attestation_hash,
&attestation.0,
)?;
// De-duplicate attestations and sort by validator index.
let mut batch = AttestationBatch::default();
for indexed_record in snapshot {
batch.queue(indexed_record);
}
// Group attestations into batches and process them.
let grouped_attestations = snapshot.group_by_validator_index(&self.config);
// Insert relevant attestations into database.
let mut num_stored = 0;
for weak_record in &batch.attestations {
if let Some(indexed_record) = weak_record.upgrade() {
self.db.store_indexed_attestation(
txn,
indexed_record.record.indexed_attestation_hash,
&indexed_record.indexed,
)?;
num_stored += 1;
}
}
debug!(
self.log,
"Stored attestations in slasher DB";
"num_stored" => num_stored,
"num_valid" => num_valid,
);
metrics::set_gauge(
&SLASHER_NUM_ATTESTATIONS_STORED_PER_BATCH,
num_stored as i64,
);
// Group attestations into chunked batches and process them.
let grouped_attestations = batch.group_by_validator_chunk_index(&self.config);
for (subqueue_id, subqueue) in grouped_attestations.subqueues.into_iter().enumerate() {
self.process_batch(txn, subqueue_id, subqueue.attestations, current_epoch)?;
self.process_batch(txn, subqueue_id, subqueue, current_epoch)?;
}
Ok(AttestationStats { num_processed })
}
@ -168,12 +192,17 @@ impl<E: EthSpec> Slasher<E> {
&self,
txn: &mut RwTransaction<'_>,
subqueue_id: usize,
batch: Vec<Arc<(IndexedAttestation<E>, AttesterRecord)>>,
batch: SimpleBatch<E>,
current_epoch: Epoch,
) -> Result<(), Error> {
// First, check for double votes.
for attestation in &batch {
match self.check_double_votes(txn, subqueue_id, &attestation.0, attestation.1) {
match self.check_double_votes(
txn,
subqueue_id,
&attestation.indexed,
attestation.record,
) {
Ok(slashings) => {
if !slashings.is_empty() {
info!(
@ -270,15 +299,15 @@ impl<E: EthSpec> Slasher<E> {
/// Returns `(valid, deferred, num_dropped)`.
fn validate(
&self,
batch: AttestationBatch<E>,
batch: SimpleBatch<E>,
current_epoch: Epoch,
) -> (AttestationBatch<E>, AttestationBatch<E>, usize) {
) -> (SimpleBatch<E>, SimpleBatch<E>, usize) {
let mut keep = Vec::with_capacity(batch.len());
let mut defer = vec![];
let mut drop_count = 0;
for tuple in batch.attestations.into_iter() {
let attestation = &tuple.0;
for indexed_record in batch {
let attestation = &indexed_record.indexed;
let target_epoch = attestation.data.target.epoch;
let source_epoch = attestation.data.source.epoch;
@ -292,20 +321,14 @@ impl<E: EthSpec> Slasher<E> {
// Check that the attestation's target epoch is acceptable, and defer it
// if it's not.
if target_epoch > current_epoch {
defer.push(tuple);
defer.push(indexed_record);
} else {
// Otherwise the attestation is OK to process.
keep.push(tuple);
keep.push(indexed_record);
}
}
(
AttestationBatch { attestations: keep },
AttestationBatch {
attestations: defer,
},
drop_count,
)
(keep, defer, drop_count)
}
/// Prune unnecessary attestations and blocks from the on-disk database.