Tweak slasher DB schema and pruning (#1948)

## Issue Addressed

Resolves #1890

## Proposed Changes

Change the slasher database schema to key indexed attestations by `(target_epoch, indexed_attestation_root)` instead of just `indexed_attestation_root`. This allows more straight-forward pruning (linear scan), that is also "re-entrant". By re-entrant, we mean that a pruning pass that gets stuck because of a `MapFull` error can attempt to commit midway, and be resumed later without issue. The previous pruning strategy for indexed attestations did not have this property. There was also a flaw in the previous pruning that could leave "zombie" indexed attestations in the database (ones not referenced by any attester record), which could build up and contribute to bloat (although in practice I think they occur quite infrequently).

## Additional Info

During testing I noticed that a `MapFull` error can still occur during the commit of the transaction itself, which is irritating, but not unbearable. This PR should at least reduce the frequency with which users need to manually resize their DB, and if the `MapFull` on commit rears its ugly head too often we could use a dynamic strategy (temporarily increase the size of the map until the transaction commits).

The extra bytes for the epoch make the database a bit heavier, so the size estimate docs have been updated to reflect this. This is also a breaking schema change, so anyone using a v0 database from a few hours ago will need to drop it and update 😅
This commit is contained in:
Michael Sproul 2020-11-23 21:33:51 +00:00
parent 5828ff1204
commit 7d644103c6
7 changed files with 203 additions and 38 deletions

View File

@ -371,8 +371,10 @@ pub fn get_config<E: EthSpec>(
slasher_config.history_length = history_length;
}
if let Some(max_db_size) = clap_utils::parse_optional(cli_args, "slasher-max-db-size")? {
slasher_config.max_db_size_gbs = max_db_size;
if let Some(max_db_size_gbs) =
clap_utils::parse_optional::<usize>(cli_args, "slasher-max-db-size")?
{
slasher_config.max_db_size_mbs = max_db_size_gbs * 1024;
}
if let Some(chunk_size) = clap_utils::parse_optional(cli_args, "slasher-chunk-size")? {

View File

@ -74,7 +74,7 @@ either you can halve the space required.
If you want a better estimate you can use this formula:
```
352 * V * N + (16 * V * N)/(C * K) + 15000 * N
360 * V * N + (16 * V * N)/(C * K) + 15000 * N
```
where

View File

@ -7,7 +7,7 @@ pub const DEFAULT_CHUNK_SIZE: usize = 16;
pub const DEFAULT_VALIDATOR_CHUNK_SIZE: usize = 256;
pub const DEFAULT_HISTORY_LENGTH: usize = 4096;
pub const DEFAULT_UPDATE_PERIOD: u64 = 12;
pub const DEFAULT_MAX_DB_SIZE: usize = 256;
pub const DEFAULT_MAX_DB_SIZE: usize = 256 * 1024; // 256 GiB
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
@ -18,8 +18,8 @@ pub struct Config {
pub history_length: usize,
/// Update frequency in seconds.
pub update_period: u64,
/// Maximum size of the LMDB database in gigabytes.
pub max_db_size_gbs: usize,
/// Maximum size of the LMDB database in megabytes.
pub max_db_size_mbs: usize,
}
impl Config {
@ -30,7 +30,7 @@ impl Config {
validator_chunk_size: DEFAULT_VALIDATOR_CHUNK_SIZE,
history_length: DEFAULT_HISTORY_LENGTH,
update_period: DEFAULT_UPDATE_PERIOD,
max_db_size_gbs: DEFAULT_MAX_DB_SIZE,
max_db_size_mbs: DEFAULT_MAX_DB_SIZE,
}
}
@ -38,7 +38,7 @@ impl Config {
if self.chunk_size == 0
|| self.validator_chunk_size == 0
|| self.history_length == 0
|| self.max_db_size_gbs == 0
|| self.max_db_size_mbs == 0
{
Err(Error::ConfigInvalidZeroParameter {
config: self.clone(),

View File

@ -1,11 +1,10 @@
use crate::{
utils::TxnOptional, AttesterRecord, AttesterSlashingStatus, Config, Error,
ProposerSlashingStatus,
utils::{TxnMapFull, TxnOptional},
AttesterRecord, AttesterSlashingStatus, Config, Error, ProposerSlashingStatus,
};
use byteorder::{BigEndian, ByteOrder};
use lmdb::{Cursor, Database, DatabaseFlags, Environment, RwTransaction, Transaction, WriteFlags};
use ssz::{Decode, Encode};
use std::collections::HashSet;
use std::marker::PhantomData;
use std::sync::Arc;
use types::{
@ -13,13 +12,13 @@ use types::{
};
/// Current database schema version, to check compatibility of on-disk DB with software.
const CURRENT_SCHEMA_VERSION: u64 = 0;
const CURRENT_SCHEMA_VERSION: u64 = 1;
/// Metadata about the slashing database itself.
const METADATA_DB: &str = "metadata";
/// Map from `(target_epoch, validator_index)` to `AttesterRecord`.
const ATTESTERS_DB: &str = "attesters";
/// Map from `indexed_attestation_hash` to `IndexedAttestation`.
/// Map from `(target_epoch, indexed_attestation_hash)` to `IndexedAttestation`.
const INDEXED_ATTESTATION_DB: &str = "indexed_attestations";
/// Table of minimum targets for every source epoch within range.
const MIN_TARGETS_DB: &str = "min_targets";
@ -42,7 +41,9 @@ const METADATA_CONFIG_KEY: &[u8] = &[1];
const ATTESTER_KEY_SIZE: usize = 16;
const PROPOSER_KEY_SIZE: usize = 16;
const GIGABYTE: usize = 1 << 30;
const CURRENT_EPOCH_KEY_SIZE: usize = 8;
const INDEXED_ATTESTATION_KEY_SIZE: usize = 40;
const MEGABYTE: usize = 1 << 20;
#[derive(Debug)]
pub struct SlasherDB<E: EthSpec> {
@ -128,7 +129,7 @@ impl AsRef<[u8]> for ProposerKey {
/// Key containing a validator index
pub struct CurrentEpochKey {
validator_index: [u8; 8],
validator_index: [u8; CURRENT_EPOCH_KEY_SIZE],
}
impl CurrentEpochKey {
@ -145,12 +146,44 @@ impl AsRef<[u8]> for CurrentEpochKey {
}
}
/// Key containing an epoch and an indexed attestation hash.
pub struct IndexedAttestationKey {
target_and_root: [u8; INDEXED_ATTESTATION_KEY_SIZE],
}
impl IndexedAttestationKey {
pub fn new(target_epoch: Epoch, indexed_attestation_root: Hash256) -> Self {
let mut data = [0; INDEXED_ATTESTATION_KEY_SIZE];
data[0..8].copy_from_slice(&target_epoch.as_u64().to_be_bytes());
data[8..INDEXED_ATTESTATION_KEY_SIZE].copy_from_slice(indexed_attestation_root.as_bytes());
Self {
target_and_root: data,
}
}
pub fn parse(data: &[u8]) -> Result<(Epoch, Hash256), Error> {
if data.len() == INDEXED_ATTESTATION_KEY_SIZE {
let target_epoch = Epoch::new(BigEndian::read_u64(&data[..8]));
let indexed_attestation_root = Hash256::from_slice(&data[8..]);
Ok((target_epoch, indexed_attestation_root))
} else {
Err(Error::IndexedAttestationKeyCorrupt { length: data.len() })
}
}
}
impl AsRef<[u8]> for IndexedAttestationKey {
fn as_ref(&self) -> &[u8] {
&self.target_and_root
}
}
impl<E: EthSpec> SlasherDB<E> {
pub fn open(config: Arc<Config>) -> Result<Self, Error> {
std::fs::create_dir_all(&config.database_path)?;
let env = Environment::new()
.set_max_dbs(LMDB_MAX_DBS)
.set_map_size(config.max_db_size_gbs * GIGABYTE)
.set_map_size(config.max_db_size_mbs * MEGABYTE)
.open_with_permissions(&config.database_path, 0o600)?;
let indexed_attestation_db =
env.create_db(Some(INDEXED_ATTESTATION_DB), Self::db_flags())?;
@ -284,11 +317,15 @@ impl<E: EthSpec> SlasherDB<E> {
indexed_attestation_hash: Hash256,
indexed_attestation: &IndexedAttestation<E>,
) -> Result<(), Error> {
let key = IndexedAttestationKey::new(
indexed_attestation.data.target.epoch,
indexed_attestation_hash,
);
let data = indexed_attestation.as_ssz_bytes();
txn.put(
self.indexed_attestation_db,
&indexed_attestation_hash.as_bytes(),
&key,
&data,
Self::write_flags(),
)?;
@ -298,10 +335,12 @@ impl<E: EthSpec> SlasherDB<E> {
pub fn get_indexed_attestation(
&self,
txn: &mut RwTransaction<'_>,
target_epoch: Epoch,
indexed_attestation_hash: Hash256,
) -> Result<IndexedAttestation<E>, Error> {
let key = IndexedAttestationKey::new(target_epoch, indexed_attestation_hash);
let bytes = txn
.get(self.indexed_attestation_db, &indexed_attestation_hash)
.get(self.indexed_attestation_db, &key)
.optional()?
.ok_or_else(|| Error::MissingIndexedAttestation {
root: indexed_attestation_hash,
@ -317,8 +356,9 @@ impl<E: EthSpec> SlasherDB<E> {
record: AttesterRecord,
) -> Result<AttesterSlashingStatus<E>, Error> {
// See if there's an existing attestation for this attester.
let target_epoch = attestation.data.target.epoch;
if let Some(existing_record) =
self.get_attester_record(txn, validator_index, attestation.data.target.epoch)?
self.get_attester_record(txn, validator_index, target_epoch)?
{
// If the existing attestation data is identical, then this attestation is not
// slashable and no update is required.
@ -327,8 +367,11 @@ impl<E: EthSpec> SlasherDB<E> {
}
// Otherwise, load the indexed attestation so we can confirm that it's slashable.
let existing_attestation =
self.get_indexed_attestation(txn, existing_record.indexed_attestation_hash)?;
let existing_attestation = self.get_indexed_attestation(
txn,
target_epoch,
existing_record.indexed_attestation_hash,
)?;
if attestation.is_double_vote(&existing_attestation) {
Ok(AttesterSlashingStatus::DoubleVote(Box::new(
existing_attestation,
@ -341,7 +384,7 @@ impl<E: EthSpec> SlasherDB<E> {
else {
txn.put(
self.attesters_db,
&AttesterKey::new(validator_index, attestation.data.target.epoch),
&AttesterKey::new(validator_index, target_epoch),
&record.as_ssz_bytes(),
Self::write_flags(),
)?;
@ -361,7 +404,7 @@ impl<E: EthSpec> SlasherDB<E> {
validator_index,
target_epoch,
})?;
self.get_indexed_attestation(txn, record.indexed_attestation_hash)
self.get_indexed_attestation(txn, target_epoch, record.indexed_attestation_hash)
}
pub fn get_attester_record(
@ -422,14 +465,28 @@ impl<E: EthSpec> SlasherDB<E> {
}
}
/// Attempt to prune the database, deleting old blocks and attestations.
pub fn prune(&self, current_epoch: Epoch) -> Result<(), Error> {
let mut txn = self.begin_rw_txn()?;
self.prune_proposers(current_epoch, &mut txn)?;
self.prune_attesters(current_epoch, &mut txn)?;
self.try_prune(current_epoch, &mut txn).allow_map_full()?;
txn.commit()?;
Ok(())
}
/// Try to prune the database.
///
/// This is a separate method from `prune` so that `allow_map_full` may be used.
pub fn try_prune(
&self,
current_epoch: Epoch,
txn: &mut RwTransaction<'_>,
) -> Result<(), Error> {
self.prune_proposers(current_epoch, txn)?;
self.prune_attesters(current_epoch, txn)?;
self.prune_indexed_attestations(current_epoch, txn)?;
Ok(())
}
fn prune_proposers(
&self,
current_epoch: Epoch,
@ -497,19 +554,15 @@ impl<E: EthSpec> SlasherDB<E> {
return Ok(());
}
let mut indexed_attestations_to_delete = HashSet::new();
loop {
let (optional_key, value) = cursor.get(None, None, lmdb_sys::MDB_GET_CURRENT)?;
let key_bytes = optional_key.ok_or_else(|| Error::MissingAttesterKey)?;
let key_bytes = cursor
.get(None, None, lmdb_sys::MDB_GET_CURRENT)?
.0
.ok_or_else(|| Error::MissingAttesterKey)?;
let (target_epoch, _validator_index) = AttesterKey::parse(key_bytes)?;
let (target_epoch, _) = AttesterKey::parse(key_bytes)?;
if target_epoch < min_epoch {
// Stage the indexed attestation for deletion and delete the record itself.
let attester_record = AttesterRecord::from_ssz_bytes(value)?;
indexed_attestations_to_delete.insert(attester_record.indexed_attestation_hash);
cursor.del(Self::write_flags())?;
// End the loop if there is no next entry.
@ -524,10 +577,51 @@ impl<E: EthSpec> SlasherDB<E> {
break;
}
}
drop(cursor);
for indexed_attestation_hash in indexed_attestations_to_delete {
txn.del(self.indexed_attestation_db, &indexed_attestation_hash, None)?;
Ok(())
}
fn prune_indexed_attestations(
&self,
current_epoch: Epoch,
txn: &mut RwTransaction<'_>,
) -> Result<(), Error> {
let min_epoch = current_epoch
.saturating_add(1u64)
.saturating_sub(self.config.history_length as u64);
let mut cursor = txn.open_rw_cursor(self.indexed_attestation_db)?;
// Position cursor at first key, bailing out if the database is empty.
if cursor
.get(None, None, lmdb_sys::MDB_FIRST)
.optional()?
.is_none()
{
return Ok(());
}
loop {
let key_bytes = cursor
.get(None, None, lmdb_sys::MDB_GET_CURRENT)?
.0
.ok_or_else(|| Error::MissingAttesterKey)?;
let (target_epoch, _) = IndexedAttestationKey::parse(key_bytes)?;
if target_epoch < min_epoch {
cursor.del(Self::write_flags())?;
if cursor
.get(None, None, lmdb_sys::MDB_NEXT)
.optional()?
.is_none()
{
break;
}
} else {
break;
}
}
Ok(())

View File

@ -41,11 +41,15 @@ pub enum Error {
ProposerKeyCorrupt {
length: usize,
},
IndexedAttestationKeyCorrupt {
length: usize,
},
MissingIndexedAttestation {
root: Hash256,
},
MissingAttesterKey,
MissingProposerKey,
MissingIndexedAttestationKey,
AttesterRecordInconsistentRoot,
}

View File

@ -14,3 +14,18 @@ impl<T> TxnOptional<T, Error> for Result<T, lmdb::Error> {
}
}
}
/// Transform a transaction that would fail with a `MapFull` error into an optional result.
pub trait TxnMapFull<T, E> {
fn allow_map_full(self) -> Result<Option<T>, E>;
}
impl<T> TxnMapFull<T, Error> for Result<T, Error> {
fn allow_map_full(self) -> Result<Option<T>, Error> {
match self {
Ok(x) => Ok(Some(x)),
Err(Error::DatabaseError(lmdb::Error::MapFull)) => Ok(None),
Err(e) => Err(e),
}
}
}

View File

@ -1,6 +1,6 @@
use slasher::{
test_utils::{indexed_att, logger},
Config, Slasher,
Config, Error, Slasher,
};
use tempdir::TempDir;
use types::Epoch;
@ -37,3 +37,53 @@ fn attestation_pruning_empty_wrap_around() {
));
slasher.process_queued(current_epoch).unwrap();
}
// Test that pruning can recover from a `MapFull` error
#[test]
fn pruning_with_map_full() {
let tempdir = TempDir::new("slasher").unwrap();
let mut config = Config::new(tempdir.path().into());
config.validator_chunk_size = 1;
config.chunk_size = 16;
config.history_length = 1024;
config.max_db_size_mbs = 1;
let slasher = Slasher::open(config.clone(), logger()).unwrap();
let v = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
let mut current_epoch = Epoch::new(0);
loop {
slasher.accept_attestation(indexed_att(
v.clone(),
(current_epoch - 1).as_u64(),
current_epoch.as_u64(),
0,
));
if let Err(Error::DatabaseError(lmdb::Error::MapFull)) =
slasher.process_queued(current_epoch)
{
break;
}
current_epoch += 1;
}
loop {
slasher.prune_database(current_epoch).unwrap();
slasher.accept_attestation(indexed_att(
v.clone(),
(current_epoch - 1).as_u64(),
current_epoch.as_u64(),
0,
));
match slasher.process_queued(current_epoch) {
Ok(()) => break,
Err(Error::DatabaseError(lmdb::Error::MapFull)) => {
current_epoch += 1;
}
Err(e) => panic!("{:?}", e),
}
}
}