Address bugs (#639)

* Change validator/duties endpoint

* Add time-based skip slot limiting

* Add new error type missed in previous commit
This commit is contained in:
Paul Hauner 2019-11-27 18:37:09 +11:00 committed by Age Manning
parent 97aa8b75b8
commit 2bbac2ed18
4 changed files with 67 additions and 46 deletions

View File

@ -9,7 +9,7 @@ use lmd_ghost::LmdGhost;
use operation_pool::DepositInsertStatus; use operation_pool::DepositInsertStatus;
use operation_pool::{OperationPool, PersistedOperationPool}; use operation_pool::{OperationPool, PersistedOperationPool};
use parking_lot::RwLock; use parking_lot::RwLock;
use slog::{crit, debug, error, info, trace, warn, Logger}; use slog::{debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use ssz::Encode; use ssz::Encode;
use state_processing::per_block_processing::{ use state_processing::per_block_processing::{
@ -25,6 +25,7 @@ use state_processing::{
use std::fs; use std::fs;
use std::io::prelude::*; use std::io::prelude::*;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant};
use store::iter::{ use store::iter::{
BlockRootsIterator, ReverseBlockRootIterator, ReverseStateRootIterator, StateRootsIterator, BlockRootsIterator, ReverseBlockRootIterator, ReverseStateRootIterator, StateRootsIterator,
}; };
@ -44,9 +45,6 @@ pub const GRAFFITI: &str = "sigp/lighthouse-0.0.0-prerelease";
/// Only useful for testing. /// Only useful for testing.
const WRITE_BLOCK_PROCESSING_SSZ: bool = cfg!(feature = "write_ssz_files"); const WRITE_BLOCK_PROCESSING_SSZ: bool = cfg!(feature = "write_ssz_files");
const BLOCK_SKIPPING_LOGGING_THRESHOLD: u64 = 3;
const BLOCK_SKIPPING_FAILURE_THRESHOLD: u64 = 128;
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum BlockProcessingOutcome { pub enum BlockProcessingOutcome {
/// Block was valid and imported into the block graph. /// Block was valid and imported into the block graph.
@ -343,34 +341,34 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if slot == head_state.slot { if slot == head_state.slot {
Ok(head_state) Ok(head_state)
} else if slot > head_state.slot { } else if slot > head_state.slot {
// It is presently very resource intensive (lots of hashing) to skip slots. if slot > head_state.slot + T::EthSpec::slots_per_epoch() {
//
// We log warnings or simply fail if there are too many skip slots. This is a
// protection against DoS attacks.
if slot > head_state.slot + BLOCK_SKIPPING_FAILURE_THRESHOLD {
crit!(
self.log,
"Refusing to skip more than {} blocks", BLOCK_SKIPPING_LOGGING_THRESHOLD;
"head_slot" => head_state.slot,
"request_slot" => slot
);
return Err(Error::StateSkipTooLarge {
head_slot: head_state.slot,
requested_slot: slot,
});
} else if slot > head_state.slot + BLOCK_SKIPPING_LOGGING_THRESHOLD {
warn!( warn!(
self.log, self.log,
"Skipping more than {} blocks", BLOCK_SKIPPING_LOGGING_THRESHOLD; "Skipping more than an epoch";
"head_slot" => head_state.slot, "head_slot" => head_state.slot,
"request_slot" => slot "request_slot" => slot
) )
} }
let start_slot = head_state.slot;
let task_start = Instant::now();
let max_task_runtime = Duration::from_millis(self.spec.milliseconds_per_slot);
let head_state_slot = head_state.slot; let head_state_slot = head_state.slot;
let mut state = head_state; let mut state = head_state;
while state.slot < slot { while state.slot < slot {
// Do not allow and forward state skip that takes longer than the maximum task duration.
//
// This is a protection against nodes doing too much work when they're not synced
// to a chain.
if task_start + max_task_runtime < Instant::now() {
return Err(Error::StateSkipTooLarge {
start_slot,
requested_slot: slot,
max_task_runtime,
});
}
match per_slot_processing(&mut state, &self.spec) { match per_slot_processing(&mut state, &self.spec) {
Ok(()) => (), Ok(()) => (),
Err(e) => { Err(e) => {

View File

@ -4,6 +4,7 @@ use ssz_types::Error as SszTypesError;
use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::per_block_processing::errors::AttestationValidationError;
use state_processing::BlockProcessingError; use state_processing::BlockProcessingError;
use state_processing::SlotProcessingError; use state_processing::SlotProcessingError;
use std::time::Duration;
use types::*; use types::*;
macro_rules! easy_from_to { macro_rules! easy_from_to {
@ -40,8 +41,9 @@ pub enum BeaconChainError {
}, },
AttestationValidationError(AttestationValidationError), AttestationValidationError(AttestationValidationError),
StateSkipTooLarge { StateSkipTooLarge {
head_slot: Slot, start_slot: Slot,
requested_slot: Slot, requested_slot: Slot,
max_task_runtime: Duration,
}, },
/// Returned when an internal check fails, indicating corrupt data. /// Returned when an internal check fails, indicating corrupt data.
InvariantViolated(String), InvariantViolated(String),

View File

@ -93,31 +93,31 @@ fn return_validator_duties<T: BeaconChainTypes>(
epoch: Epoch, epoch: Epoch,
validator_pubkeys: Vec<PublicKey>, validator_pubkeys: Vec<PublicKey>,
) -> Result<Vec<ValidatorDuty>, ApiError> { ) -> Result<Vec<ValidatorDuty>, ApiError> {
let head_state = beacon_chain.head().beacon_state; let slots_per_epoch = T::EthSpec::slots_per_epoch();
let head_epoch = head_state.current_epoch(); let head_epoch = beacon_chain.head().beacon_state.current_epoch();
let relative_epoch = RelativeEpoch::from_epoch(head_epoch, epoch); let mut state = if RelativeEpoch::from_epoch(head_epoch, epoch).is_ok() {
let mut state = if relative_epoch.is_err() { beacon_chain.head().beacon_state
head_state
} else { } else {
match beacon_chain.state_at_slot(epoch.start_slot(T::EthSpec::slots_per_epoch())) { let slot = if epoch > head_epoch {
Ok(state) => state, // Move to the first slot of the epoch prior to the request.
Err(e) => { //
return Err(ApiError::ServerError(format!( // Taking advantage of saturating epoch subtraction.
"Unable to load state for epoch {}: {:?}", (epoch - 1).start_slot(slots_per_epoch)
epoch, e } else {
))) // Move to the end of the epoch following the target.
} //
} // Taking advantage of saturating epoch subtraction.
(epoch + 2).start_slot(slots_per_epoch) - 1
}; };
let relative_epoch = relative_epoch.or_else(|_| { beacon_chain.state_at_slot(slot).map_err(|e| {
RelativeEpoch::from_epoch(state.current_epoch(), epoch).map_err(|_| { ApiError::ServerError(format!("Unable to load state for epoch {}: {:?}", epoch, e))
ApiError::BadRequest(String::from( })?
"Epoch must be within one epoch of the current epoch", };
))
}) let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), epoch)
})?; .map_err(|_| ApiError::ServerError(String::from("Loaded state is in the wrong epoch")))?;
state state
.build_committee_cache(relative_epoch, &beacon_chain.spec) .build_committee_cache(relative_epoch, &beacon_chain.spec)

View File

@ -205,7 +205,7 @@ fn validator_duties() {
.beacon_chain() .beacon_chain()
.expect("client should have beacon chain"); .expect("client should have beacon chain");
let epoch = Epoch::new(0); let mut epoch = Epoch::new(0);
let validators = beacon_chain let validators = beacon_chain
.head() .head()
@ -220,7 +220,26 @@ fn validator_duties() {
.block_on(remote_node.http.validator().get_duties(epoch, &validators)) .block_on(remote_node.http.validator().get_duties(epoch, &validators))
.expect("should fetch duties from http api"); .expect("should fetch duties from http api");
// 1. Check at the current epoch.
check_duties(
duties,
epoch,
validators.clone(),
beacon_chain.clone(),
spec,
);
epoch += 4;
let duties = env
.runtime()
.block_on(remote_node.http.validator().get_duties(epoch, &validators))
.expect("should fetch duties from http api");
// 2. Check with a long skip forward.
check_duties(duties, epoch, validators, beacon_chain, spec); check_duties(duties, epoch, validators, beacon_chain, spec);
// TODO: test an epoch in the past. Blocked because the `LocalBeaconNode` cannot produce a
// chain, yet.
} }
fn check_duties<T: BeaconChainTypes>( fn check_duties<T: BeaconChainTypes>(
@ -236,7 +255,9 @@ fn check_duties<T: BeaconChainTypes>(
"there should be a duty for each validator" "there should be a duty for each validator"
); );
let state = beacon_chain.head().beacon_state.clone(); let state = beacon_chain
.state_at_slot(epoch.start_slot(T::EthSpec::slots_per_epoch()))
.expect("should get state at slot");
validators validators
.iter() .iter()