Merge remote-tracking branch 'origin/unstable' into capella-merge

This commit is contained in:
Michael Sproul 2023-02-14 12:07:27 +11:00
commit 18c8cab4da
No known key found for this signature in database
GPG Key ID: 77B1309D2E54E914
51 changed files with 1961 additions and 362 deletions

View File

@ -49,7 +49,7 @@ jobs:
VERSION: ${{ env.VERSION }} VERSION: ${{ env.VERSION }}
VERSION_SUFFIX: ${{ env.VERSION_SUFFIX }} VERSION_SUFFIX: ${{ env.VERSION_SUFFIX }}
build-docker-single-arch: build-docker-single-arch:
name: build-docker-${{ matrix.binary }} name: build-docker-${{ matrix.binary }}${{ matrix.features.version_suffix }}
runs-on: ubuntu-22.04 runs-on: ubuntu-22.04
strategy: strategy:
matrix: matrix:
@ -57,6 +57,10 @@ jobs:
aarch64-portable, aarch64-portable,
x86_64, x86_64,
x86_64-portable] x86_64-portable]
features: [
{version_suffix: "", env: "gnosis,slasher-lmdb,slasher-mdbx,jemalloc"},
{version_suffix: "-dev", env: "gnosis,slasher-lmdb,slasher-mdbx,jemalloc,spec-minimal"}
]
include: include:
- profile: maxperf - profile: maxperf
@ -66,7 +70,9 @@ jobs:
DOCKER_CLI_EXPERIMENTAL: enabled DOCKER_CLI_EXPERIMENTAL: enabled
VERSION: ${{ needs.extract-version.outputs.VERSION }} VERSION: ${{ needs.extract-version.outputs.VERSION }}
VERSION_SUFFIX: ${{ needs.extract-version.outputs.VERSION_SUFFIX }} VERSION_SUFFIX: ${{ needs.extract-version.outputs.VERSION_SUFFIX }}
CROSS_FEATURES: null FEATURE_SUFFIX: ${{ matrix.features.version_suffix }}
FEATURES: ${{ matrix.features.env }}
CROSS_FEATURES: ${{ matrix.features.env }}
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
- name: Update Rust - name: Update Rust
@ -77,7 +83,7 @@ jobs:
- name: Cross build Lighthouse binary - name: Cross build Lighthouse binary
run: | run: |
cargo install cross cargo install cross
env CROSS_PROFILE=${{ matrix.profile }} make build-${{ matrix.binary }} env CROSS_PROFILE=${{ matrix.profile }} CROSS_FEATURES=${{ matrix.features.env }} make build-${{ matrix.binary }}
- name: Move cross-built binary into Docker scope (if ARM) - name: Move cross-built binary into Docker scope (if ARM)
if: startsWith(matrix.binary, 'aarch64') if: startsWith(matrix.binary, 'aarch64')
run: | run: |
@ -105,7 +111,8 @@ jobs:
docker buildx build \ docker buildx build \
--platform=linux/${SHORT_ARCH} \ --platform=linux/${SHORT_ARCH} \
--file ./Dockerfile.cross . \ --file ./Dockerfile.cross . \
--tag ${IMAGE_NAME}:${VERSION}-${SHORT_ARCH}${VERSION_SUFFIX}${MODERNITY_SUFFIX} \ --tag ${IMAGE_NAME}:${VERSION}-${SHORT_ARCH}${VERSION_SUFFIX}${MODERNITY_SUFFIX}${FEATURE_SUFFIX} \
--build-arg FEATURES=${FEATURES} \
--provenance=false \ --provenance=false \
--push --push
build-docker-multiarch: build-docker-multiarch:

View File

@ -134,11 +134,17 @@ jobs:
- name: Build Lighthouse for Windows portable - name: Build Lighthouse for Windows portable
if: matrix.arch == 'x86_64-windows-portable' if: matrix.arch == 'x86_64-windows-portable'
run: cargo install --path lighthouse --force --locked --features portable,gnosis --profile ${{ matrix.profile }} # NOTE: profile set to release until this rustc issue is fixed:
#
# https://github.com/rust-lang/rust/issues/107781
#
# tracked at: https://github.com/sigp/lighthouse/issues/3964
run: cargo install --path lighthouse --force --locked --features portable,gnosis --profile release
- name: Build Lighthouse for Windows modern - name: Build Lighthouse for Windows modern
if: matrix.arch == 'x86_64-windows' if: matrix.arch == 'x86_64-windows'
run: cargo install --path lighthouse --force --locked --features modern,gnosis --profile ${{ matrix.profile }} # NOTE: profile set to release (see above)
run: cargo install --path lighthouse --force --locked --features modern,gnosis --profile release
- name: Configure GPG and create artifacts - name: Configure GPG and create artifacts
if: startsWith(matrix.arch, 'x86_64-windows') != true if: startsWith(matrix.arch, 'x86_64-windows') != true

532
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -92,7 +92,6 @@ resolver = "2"
[patch] [patch]
[patch.crates-io] [patch.crates-io]
fixed-hash = { git = "https://github.com/paritytech/parity-common", rev="df638ab0885293d21d656dc300d39236b69ce57d" }
warp = { git = "https://github.com/macladson/warp", rev="7e75acc368229a46a236a8c991bf251fe7fe50ef" } warp = { git = "https://github.com/macladson/warp", rev="7e75acc368229a46a236a8c991bf251fe7fe50ef" }
eth2_ssz = { path = "consensus/ssz" } eth2_ssz = { path = "consensus/ssz" }
eth2_ssz_derive = { path = "consensus/ssz_derive" } eth2_ssz_derive = { path = "consensus/ssz_derive" }

View File

@ -193,7 +193,7 @@ arbitrary-fuzz:
# Runs cargo audit (Audit Cargo.lock files for crates with security vulnerabilities reported to the RustSec Advisory Database) # Runs cargo audit (Audit Cargo.lock files for crates with security vulnerabilities reported to the RustSec Advisory Database)
audit: audit:
cargo install --force cargo-audit cargo install --force cargo-audit
cargo audit --ignore RUSTSEC-2020-0071 --ignore RUSTSEC-2020-0159 cargo audit --ignore RUSTSEC-2020-0071
# Runs `cargo vendor` to make sure dependencies can be vendored for packaging, reproducibility and archival purpose. # Runs `cargo vendor` to make sure dependencies can be vendored for packaging, reproducibility and archival purpose.
vendor: vendor:

View File

@ -0,0 +1,195 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::lighthouse::attestation_rewards::{IdealAttestationRewards, TotalAttestationRewards};
use eth2::lighthouse::StandardAttestationRewards;
use participation_cache::ParticipationCache;
use safe_arith::SafeArith;
use slog::{debug, Logger};
use state_processing::{
common::altair::BaseRewardPerIncrement,
per_epoch_processing::altair::{participation_cache, rewards_and_penalties::get_flag_weight},
};
use std::collections::HashMap;
use store::consts::altair::{
PARTICIPATION_FLAG_WEIGHTS, TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX,
TIMELY_TARGET_FLAG_INDEX,
};
use types::consts::altair::WEIGHT_DENOMINATOR;
use types::{Epoch, EthSpec};
use eth2::types::ValidatorId;
impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn compute_attestation_rewards(
&self,
epoch: Epoch,
validators: Vec<ValidatorId>,
log: Logger,
) -> Result<StandardAttestationRewards, BeaconChainError> {
debug!(log, "computing attestation rewards"; "epoch" => epoch, "validator_count" => validators.len());
// Get state
let spec = &self.spec;
let state_slot = (epoch + 1).end_slot(T::EthSpec::slots_per_epoch());
let state_root = self
.state_root_at_slot(state_slot)?
.ok_or(BeaconChainError::NoStateForSlot(state_slot))?;
let mut state = self
.get_state(&state_root, Some(state_slot))?
.ok_or(BeaconChainError::MissingBeaconState(state_root))?;
// Calculate ideal_rewards
let participation_cache = ParticipationCache::new(&state, spec)?;
let previous_epoch = state.previous_epoch();
let mut ideal_rewards_hashmap = HashMap::new();
for flag_index in 0..PARTICIPATION_FLAG_WEIGHTS.len() {
let weight = get_flag_weight(flag_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
let unslashed_participating_indices = participation_cache
.get_unslashed_participating_indices(flag_index, previous_epoch)?;
let unslashed_participating_balance =
unslashed_participating_indices
.total_balance()
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
let unslashed_participating_increments =
unslashed_participating_balance.safe_div(spec.effective_balance_increment)?;
let total_active_balance = participation_cache.current_epoch_total_active_balance();
let active_increments =
total_active_balance.safe_div(spec.effective_balance_increment)?;
let base_reward_per_increment =
BaseRewardPerIncrement::new(total_active_balance, spec)?;
for effective_balance_eth in 0..=32 {
let effective_balance =
effective_balance_eth.safe_mul(spec.effective_balance_increment)?;
let base_reward =
effective_balance_eth.safe_mul(base_reward_per_increment.as_u64())?;
let penalty = -(base_reward.safe_mul(weight)?.safe_div(WEIGHT_DENOMINATOR)? as i64);
let reward_numerator = base_reward
.safe_mul(weight)?
.safe_mul(unslashed_participating_increments)?;
let ideal_reward = reward_numerator
.safe_div(active_increments)?
.safe_div(WEIGHT_DENOMINATOR)?;
if !state.is_in_inactivity_leak(previous_epoch, spec) {
ideal_rewards_hashmap
.insert((flag_index, effective_balance), (ideal_reward, penalty));
} else {
ideal_rewards_hashmap.insert((flag_index, effective_balance), (0, penalty));
}
}
}
// Calculate total_rewards
let mut total_rewards: Vec<TotalAttestationRewards> = Vec::new();
let validators = if validators.is_empty() {
participation_cache.eligible_validator_indices().to_vec()
} else {
validators
.into_iter()
.map(|validator| match validator {
ValidatorId::Index(i) => Ok(i as usize),
ValidatorId::PublicKey(pubkey) => state
.get_validator_index(&pubkey)?
.ok_or(BeaconChainError::ValidatorPubkeyUnknown(pubkey)),
})
.collect::<Result<Vec<_>, _>>()?
};
for validator_index in &validators {
let eligible = state.is_eligible_validator(previous_epoch, *validator_index)?;
let mut head_reward = 0u64;
let mut target_reward = 0i64;
let mut source_reward = 0i64;
if eligible {
let effective_balance = state.get_effective_balance(*validator_index)?;
for flag_index in 0..PARTICIPATION_FLAG_WEIGHTS.len() {
let (ideal_reward, penalty) = ideal_rewards_hashmap
.get(&(flag_index, effective_balance))
.ok_or(BeaconChainError::AttestationRewardsError)?;
let voted_correctly = participation_cache
.get_unslashed_participating_indices(flag_index, previous_epoch)
.map_err(|_| BeaconChainError::AttestationRewardsError)?
.contains(*validator_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
if voted_correctly {
if flag_index == TIMELY_HEAD_FLAG_INDEX {
head_reward += ideal_reward;
} else if flag_index == TIMELY_TARGET_FLAG_INDEX {
target_reward += *ideal_reward as i64;
} else if flag_index == TIMELY_SOURCE_FLAG_INDEX {
source_reward += *ideal_reward as i64;
}
} else if flag_index == TIMELY_HEAD_FLAG_INDEX {
head_reward = 0;
} else if flag_index == TIMELY_TARGET_FLAG_INDEX {
target_reward = *penalty;
} else if flag_index == TIMELY_SOURCE_FLAG_INDEX {
source_reward = *penalty;
}
}
}
total_rewards.push(TotalAttestationRewards {
validator_index: *validator_index as u64,
head: head_reward,
target: target_reward,
source: source_reward,
});
}
// Convert hashmap to vector
let mut ideal_rewards: Vec<IdealAttestationRewards> = ideal_rewards_hashmap
.iter()
.map(
|((flag_index, effective_balance), (ideal_reward, _penalty))| {
(flag_index, effective_balance, ideal_reward)
},
)
.fold(
HashMap::new(),
|mut acc, (flag_index, &effective_balance, ideal_reward)| {
let entry = acc
.entry(effective_balance)
.or_insert(IdealAttestationRewards {
effective_balance,
head: 0,
target: 0,
source: 0,
});
match *flag_index {
TIMELY_SOURCE_FLAG_INDEX => entry.source += ideal_reward,
TIMELY_TARGET_FLAG_INDEX => entry.target += ideal_reward,
TIMELY_HEAD_FLAG_INDEX => entry.head += ideal_reward,
_ => {}
}
acc
},
)
.into_values()
.collect::<Vec<IdealAttestationRewards>>();
ideal_rewards.sort_by(|a, b| a.effective_balance.cmp(&b.effective_balance));
Ok(StandardAttestationRewards {
ideal_rewards,
total_rewards,
})
}
}

View File

@ -0,0 +1,237 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::lighthouse::StandardBlockReward;
use operation_pool::RewardCache;
use safe_arith::SafeArith;
use slog::error;
use state_processing::{
common::{
altair, get_attestation_participation_flag_indices, get_attesting_indices_from_state,
},
per_block_processing::{
altair::sync_committee::compute_sync_aggregate_rewards, get_slashable_indices,
},
};
use store::{
consts::altair::{PARTICIPATION_FLAG_WEIGHTS, PROPOSER_WEIGHT, WEIGHT_DENOMINATOR},
RelativeEpoch,
};
use types::{BeaconBlockRef, BeaconState, BeaconStateError, ExecPayload, Hash256};
type BeaconBlockSubRewardValue = u64;
impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn compute_beacon_block_reward<Payload: ExecPayload<T::EthSpec>>(
&self,
block: BeaconBlockRef<'_, T::EthSpec, Payload>,
block_root: Hash256,
state: &mut BeaconState<T::EthSpec>,
) -> Result<StandardBlockReward, BeaconChainError> {
if block.slot() != state.slot() {
return Err(BeaconChainError::BlockRewardSlotError);
}
state.build_committee_cache(RelativeEpoch::Previous, &self.spec)?;
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
let proposer_index = block.proposer_index();
let sync_aggregate_reward =
self.compute_beacon_block_sync_aggregate_reward(block, state)?;
let proposer_slashing_reward = self
.compute_beacon_block_proposer_slashing_reward(block, state)
.map_err(|e| {
error!(
self.log,
"Error calculating proposer slashing reward";
"error" => ?e
);
BeaconChainError::BlockRewardError
})?;
let attester_slashing_reward = self
.compute_beacon_block_attester_slashing_reward(block, state)
.map_err(|e| {
error!(
self.log,
"Error calculating attester slashing reward";
"error" => ?e
);
BeaconChainError::BlockRewardError
})?;
let block_attestation_reward = if let BeaconState::Base(_) = state {
self.compute_beacon_block_attestation_reward_base(block, block_root, state)
.map_err(|e| {
error!(
self.log,
"Error calculating base block attestation reward";
"error" => ?e
);
BeaconChainError::BlockRewardAttestationError
})?
} else {
self.compute_beacon_block_attestation_reward_altair(block, state)
.map_err(|e| {
error!(
self.log,
"Error calculating altair block attestation reward";
"error" => ?e
);
BeaconChainError::BlockRewardAttestationError
})?
};
let total_reward = sync_aggregate_reward
.safe_add(proposer_slashing_reward)?
.safe_add(attester_slashing_reward)?
.safe_add(block_attestation_reward)?;
Ok(StandardBlockReward {
proposer_index,
total: total_reward,
attestations: block_attestation_reward,
sync_aggregate: sync_aggregate_reward,
proposer_slashings: proposer_slashing_reward,
attester_slashings: attester_slashing_reward,
})
}
fn compute_beacon_block_sync_aggregate_reward<Payload: ExecPayload<T::EthSpec>>(
&self,
block: BeaconBlockRef<'_, T::EthSpec, Payload>,
state: &BeaconState<T::EthSpec>,
) -> Result<BeaconBlockSubRewardValue, BeaconChainError> {
if let Ok(sync_aggregate) = block.body().sync_aggregate() {
let (_, proposer_reward_per_bit) = compute_sync_aggregate_rewards(state, &self.spec)
.map_err(|_| BeaconChainError::BlockRewardSyncError)?;
Ok(sync_aggregate.sync_committee_bits.num_set_bits() as u64 * proposer_reward_per_bit)
} else {
Ok(0)
}
}
fn compute_beacon_block_proposer_slashing_reward<Payload: ExecPayload<T::EthSpec>>(
&self,
block: BeaconBlockRef<'_, T::EthSpec, Payload>,
state: &BeaconState<T::EthSpec>,
) -> Result<BeaconBlockSubRewardValue, BeaconChainError> {
let mut proposer_slashing_reward = 0;
let proposer_slashings = block.body().proposer_slashings();
for proposer_slashing in proposer_slashings {
proposer_slashing_reward.safe_add_assign(
state
.get_validator(proposer_slashing.proposer_index() as usize)?
.effective_balance
.safe_div(self.spec.whistleblower_reward_quotient)?,
)?;
}
Ok(proposer_slashing_reward)
}
fn compute_beacon_block_attester_slashing_reward<Payload: ExecPayload<T::EthSpec>>(
&self,
block: BeaconBlockRef<'_, T::EthSpec, Payload>,
state: &BeaconState<T::EthSpec>,
) -> Result<BeaconBlockSubRewardValue, BeaconChainError> {
let mut attester_slashing_reward = 0;
let attester_slashings = block.body().attester_slashings();
for attester_slashing in attester_slashings {
for attester_index in get_slashable_indices(state, attester_slashing)? {
attester_slashing_reward.safe_add_assign(
state
.get_validator(attester_index as usize)?
.effective_balance
.safe_div(self.spec.whistleblower_reward_quotient)?,
)?;
}
}
Ok(attester_slashing_reward)
}
fn compute_beacon_block_attestation_reward_base<Payload: ExecPayload<T::EthSpec>>(
&self,
block: BeaconBlockRef<'_, T::EthSpec, Payload>,
block_root: Hash256,
state: &BeaconState<T::EthSpec>,
) -> Result<BeaconBlockSubRewardValue, BeaconChainError> {
// Call compute_block_reward in the base case
// Since base does not have sync aggregate, we only grab attesation portion of the returned
// value
let mut reward_cache = RewardCache::default();
let block_attestation_reward = self
.compute_block_reward(block, block_root, state, &mut reward_cache, true)?
.attestation_rewards
.total;
Ok(block_attestation_reward)
}
fn compute_beacon_block_attestation_reward_altair<Payload: ExecPayload<T::EthSpec>>(
&self,
block: BeaconBlockRef<'_, T::EthSpec, Payload>,
state: &mut BeaconState<T::EthSpec>,
) -> Result<BeaconBlockSubRewardValue, BeaconChainError> {
let total_active_balance = state.get_total_active_balance()?;
let base_reward_per_increment =
altair::BaseRewardPerIncrement::new(total_active_balance, &self.spec)?;
let mut total_proposer_reward = 0;
let proposer_reward_denominator = WEIGHT_DENOMINATOR
.safe_sub(PROPOSER_WEIGHT)?
.safe_mul(WEIGHT_DENOMINATOR)?
.safe_div(PROPOSER_WEIGHT)?;
for attestation in block.body().attestations() {
let data = &attestation.data;
let inclusion_delay = state.slot().safe_sub(data.slot)?.as_u64();
let participation_flag_indices = get_attestation_participation_flag_indices(
state,
data,
inclusion_delay,
&self.spec,
)?;
let attesting_indices = get_attesting_indices_from_state(state, attestation)?;
let mut proposer_reward_numerator = 0;
for index in attesting_indices {
let index = index as usize;
for (flag_index, &weight) in PARTICIPATION_FLAG_WEIGHTS.iter().enumerate() {
let epoch_participation =
state.get_epoch_participation_mut(data.target.epoch)?;
let validator_participation = epoch_participation
.get_mut(index)
.ok_or(BeaconStateError::ParticipationOutOfBounds(index))?;
if participation_flag_indices.contains(&flag_index)
&& !validator_participation.has_flag(flag_index)?
{
validator_participation.add_flag(flag_index)?;
proposer_reward_numerator.safe_add_assign(
altair::get_base_reward(
state,
index,
base_reward_per_increment,
&self.spec,
)?
.safe_mul(weight)?,
)?;
}
}
}
total_proposer_reward.safe_add_assign(
proposer_reward_numerator.safe_div(proposer_reward_denominator)?,
)?;
}
Ok(total_proposer_reward)
}
}

View File

@ -8,7 +8,7 @@ use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache; use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::block_times_cache::BlockTimesCache; use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::{ use crate::block_verification::{
check_block_is_finalized_descendant, check_block_relevancy, get_block_root, check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy, get_block_root,
signature_verify_chain_segment, BlockError, ExecutionPendingBlock, GossipVerifiedBlock, signature_verify_chain_segment, BlockError, ExecutionPendingBlock, GossipVerifiedBlock,
IntoExecutionPendingBlock, PayloadVerificationOutcome, POS_PANDA_BANNER, IntoExecutionPendingBlock, PayloadVerificationOutcome, POS_PANDA_BANNER,
}; };
@ -2795,7 +2795,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// is so we don't have to think about lock ordering with respect to the fork choice lock. // is so we don't have to think about lock ordering with respect to the fork choice lock.
// There are a bunch of places where we lock both fork choice and the pubkey cache and it // There are a bunch of places where we lock both fork choice and the pubkey cache and it
// would be difficult to check that they all lock fork choice first. // would be difficult to check that they all lock fork choice first.
let mut kv_store_ops = self let mut ops = self
.validator_pubkey_cache .validator_pubkey_cache
.try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) .try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)? .ok_or(Error::ValidatorPubkeyCacheLockTimeout)?
@ -2817,7 +2817,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut fork_choice = self.canonical_head.fork_choice_write_lock(); let mut fork_choice = self.canonical_head.fork_choice_write_lock();
// Do not import a block that doesn't descend from the finalized root. // Do not import a block that doesn't descend from the finalized root.
check_block_is_finalized_descendant(self, &fork_choice, &signed_block)?; check_block_is_finalized_checkpoint_or_descendant(self, &fork_choice, &signed_block)?;
// Register the new block with the fork choice service. // Register the new block with the fork choice service.
{ {
@ -2897,9 +2897,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// ---------------------------- BLOCK PROBABLY ATTESTABLE ---------------------------------- // ---------------------------- BLOCK PROBABLY ATTESTABLE ----------------------------------
// Most blocks are now capable of being attested to thanks to the `early_attester_cache` // Most blocks are now capable of being attested to thanks to the `early_attester_cache`
// cache above. Resume non-essential processing. // cache above. Resume non-essential processing.
//
// It is important NOT to return errors here before the database commit, because the block
// has already been added to fork choice and the database would be left in an inconsistent
// state if we returned early without committing. In other words, an error here would
// corrupt the node's database permanently.
// ----------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------
self.import_block_update_shuffling_cache(block_root, &mut state)?; self.import_block_update_shuffling_cache(block_root, &mut state);
self.import_block_observe_attestations( self.import_block_observe_attestations(
block, block,
&state, &state,
@ -2922,17 +2927,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the write fails, revert fork choice to the version from disk, else we can // If the write fails, revert fork choice to the version from disk, else we can
// end up with blocks in fork choice that are missing from disk. // end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028 // See https://github.com/sigp/lighthouse/issues/2028
let mut ops: Vec<_> = confirmed_state_roots ops.extend(
confirmed_state_roots
.into_iter() .into_iter()
.map(StoreOp::DeleteStateTemporaryFlag) .map(StoreOp::DeleteStateTemporaryFlag),
.collect(); );
ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
ops.push(StoreOp::PutState(block.state_root(), &state)); ops.push(StoreOp::PutState(block.state_root(), &state));
let txn_lock = self.store.hot_db.begin_rw_transaction(); let txn_lock = self.store.hot_db.begin_rw_transaction();
kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?); if let Err(e) = self.store.do_atomically(ops) {
if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) {
error!( error!(
self.log, self.log,
"Database write failed!"; "Database write failed!";
@ -3361,13 +3365,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} }
} }
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
fn import_block_update_shuffling_cache( fn import_block_update_shuffling_cache(
&self, &self,
block_root: Hash256, block_root: Hash256,
state: &mut BeaconState<T::EthSpec>, state: &mut BeaconState<T::EthSpec>,
) {
if let Err(e) = self.import_block_update_shuffling_cache_fallible(block_root, state) {
warn!(
self.log,
"Failed to prime shuffling cache";
"error" => ?e
);
}
}
fn import_block_update_shuffling_cache_fallible(
&self,
block_root: Hash256,
state: &mut BeaconState<T::EthSpec>,
) -> Result<(), BlockError<T::EthSpec>> { ) -> Result<(), BlockError<T::EthSpec>> {
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] { for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?; let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;

View File

@ -745,7 +745,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
// Do not process a block that doesn't descend from the finalized root. // Do not process a block that doesn't descend from the finalized root.
// //
// We check this *before* we load the parent so that we can return a more detailed error. // We check this *before* we load the parent so that we can return a more detailed error.
check_block_is_finalized_descendant( check_block_is_finalized_checkpoint_or_descendant(
chain, chain,
&chain.canonical_head.fork_choice_write_lock(), &chain.canonical_head.fork_choice_write_lock(),
&block, &block,
@ -1565,12 +1565,12 @@ fn check_block_against_finalized_slot<T: BeaconChainTypes>(
/// ## Warning /// ## Warning
/// ///
/// Taking a lock on the `chain.canonical_head.fork_choice` might cause a deadlock here. /// Taking a lock on the `chain.canonical_head.fork_choice` might cause a deadlock here.
pub fn check_block_is_finalized_descendant<T: BeaconChainTypes>( pub fn check_block_is_finalized_checkpoint_or_descendant<T: BeaconChainTypes>(
chain: &BeaconChain<T>, chain: &BeaconChain<T>,
fork_choice: &BeaconForkChoice<T>, fork_choice: &BeaconForkChoice<T>,
block: &Arc<SignedBeaconBlock<T::EthSpec>>, block: &Arc<SignedBeaconBlock<T::EthSpec>>,
) -> Result<(), BlockError<T::EthSpec>> { ) -> Result<(), BlockError<T::EthSpec>> {
if fork_choice.is_descendant_of_finalized(block.parent_root()) { if fork_choice.is_finalized_checkpoint_or_descendant(block.parent_root()) {
Ok(()) Ok(())
} else { } else {
// If fork choice does *not* consider the parent to be a descendant of the finalized block, // If fork choice does *not* consider the parent to be a descendant of the finalized block,

View File

@ -51,7 +51,6 @@ pub enum BeaconChainError {
}, },
SlotClockDidNotStart, SlotClockDidNotStart,
NoStateForSlot(Slot), NoStateForSlot(Slot),
UnableToFindTargetRoot(Slot),
BeaconStateError(BeaconStateError), BeaconStateError(BeaconStateError),
DBInconsistent(String), DBInconsistent(String),
DBError(store::Error), DBError(store::Error),
@ -157,10 +156,12 @@ pub enum BeaconChainError {
ExecutionForkChoiceUpdateInvalid { ExecutionForkChoiceUpdateInvalid {
status: PayloadStatus, status: PayloadStatus,
}, },
BlockRewardError,
BlockRewardSlotError, BlockRewardSlotError,
BlockRewardAttestationError, BlockRewardAttestationError,
BlockRewardSyncError, BlockRewardSyncError,
SyncCommitteeRewardsSyncError, SyncCommitteeRewardsSyncError,
AttestationRewardsError,
HeadMissingFromForkChoice(Hash256), HeadMissingFromForkChoice(Hash256),
FinalizedBlockMissingFromForkChoice(Hash256), FinalizedBlockMissingFromForkChoice(Hash256),
HeadBlockMissingFromForkChoice(Hash256), HeadBlockMissingFromForkChoice(Hash256),

View File

@ -1,6 +1,8 @@
#![recursion_limit = "128"] // For lazy-static #![recursion_limit = "128"] // For lazy-static
pub mod attestation_rewards;
pub mod attestation_verification; pub mod attestation_verification;
mod attester_cache; mod attester_cache;
pub mod beacon_block_reward;
mod beacon_chain; mod beacon_chain;
mod beacon_fork_choice_store; mod beacon_fork_choice_store;
pub mod beacon_proposer_cache; pub mod beacon_proposer_cache;

View File

@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryInto; use std::convert::TryInto;
use std::marker::PhantomData; use std::marker::PhantomData;
use store::{DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreItem}; use store::{DBColumn, Error as StoreError, StoreItem, StoreOp};
use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes}; use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes};
/// Provides a mapping of `validator_index -> validator_publickey`. /// Provides a mapping of `validator_index -> validator_publickey`.
@ -38,7 +38,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
}; };
let store_ops = cache.import_new_pubkeys(state)?; let store_ops = cache.import_new_pubkeys(state)?;
store.hot_db.do_atomically(store_ops)?; store.do_atomically(store_ops)?;
Ok(cache) Ok(cache)
} }
@ -79,7 +79,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pub fn import_new_pubkeys( pub fn import_new_pubkeys(
&mut self, &mut self,
state: &BeaconState<T::EthSpec>, state: &BeaconState<T::EthSpec>,
) -> Result<Vec<KeyValueStoreOp>, BeaconChainError> { ) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError> {
if state.validators().len() > self.pubkeys.len() { if state.validators().len() > self.pubkeys.len() {
self.import( self.import(
state.validators()[self.pubkeys.len()..] state.validators()[self.pubkeys.len()..]
@ -92,7 +92,10 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
} }
/// Adds zero or more validators to `self`. /// Adds zero or more validators to `self`.
fn import<I>(&mut self, validator_keys: I) -> Result<Vec<KeyValueStoreOp>, BeaconChainError> fn import<I>(
&mut self,
validator_keys: I,
) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError>
where where
I: Iterator<Item = PublicKeyBytes> + ExactSizeIterator, I: Iterator<Item = PublicKeyBytes> + ExactSizeIterator,
{ {
@ -112,7 +115,9 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
// It will be committed atomically when the block that introduced it is written to disk. // It will be committed atomically when the block that introduced it is written to disk.
// Notably it is NOT written while the write lock on the cache is held. // Notably it is NOT written while the write lock on the cache is held.
// See: https://github.com/sigp/lighthouse/issues/2327 // See: https://github.com/sigp/lighthouse/issues/2327
store_ops.push(DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i))); store_ops.push(StoreOp::KeyValueOp(
DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)),
));
self.pubkeys.push( self.pubkeys.push(
(&pubkey) (&pubkey)
@ -294,7 +299,7 @@ mod test {
let ops = cache let ops = cache
.import_new_pubkeys(&state) .import_new_pubkeys(&state)
.expect("should import pubkeys"); .expect("should import pubkeys");
store.hot_db.do_atomically(ops).unwrap(); store.do_atomically(ops).unwrap();
check_cache_get(&cache, &keypairs[..]); check_cache_get(&cache, &keypairs[..]);
drop(cache); drop(cache);

View File

@ -364,7 +364,7 @@ impl Engine {
Ok(result) Ok(result)
} }
Err(error) => { Err(error) => {
error!( warn!(
self.log, self.log,
"Execution engine call failed"; "Execution engine call failed";
"error" => ?error, "error" => ?error,

View File

@ -1821,10 +1821,10 @@ impl<T: EthSpec> ExecutionLayer<T> {
&metrics::EXECUTION_LAYER_BUILDER_REVEAL_PAYLOAD_OUTCOME, &metrics::EXECUTION_LAYER_BUILDER_REVEAL_PAYLOAD_OUTCOME,
&[metrics::FAILURE], &[metrics::FAILURE],
); );
error!( warn!(
self.log(), self.log(),
"Builder failed to reveal payload"; "Builder failed to reveal payload";
"info" => "this relay failure may cause a missed proposal", "info" => "this is common behaviour for some builders and may not indicate an issue",
"error" => ?e, "error" => ?e,
"relay_response_ms" => duration.as_millis(), "relay_response_ms" => duration.as_millis(),
"block_root" => ?block_root, "block_root" => ?block_root,

View File

@ -15,6 +15,7 @@ mod database;
mod metrics; mod metrics;
mod proposer_duties; mod proposer_duties;
mod publish_blocks; mod publish_blocks;
mod standard_block_rewards;
mod state_id; mod state_id;
mod sync_committee_rewards; mod sync_committee_rewards;
mod sync_committees; mod sync_committees;
@ -1806,6 +1807,27 @@ pub fn serve<T: BeaconChainTypes>(
}, },
); );
let beacon_rewards_path = eth_v1
.and(warp::path("beacon"))
.and(warp::path("rewards"))
.and(chain_filter.clone());
// GET beacon/rewards/blocks/{block_id}
let get_beacon_rewards_blocks = beacon_rewards_path
.clone()
.and(warp::path("blocks"))
.and(block_id_or_err)
.and(warp::path::end())
.and_then(|chain: Arc<BeaconChain<T>>, block_id: BlockId| {
blocking_json_task(move || {
let (rewards, execution_optimistic) =
standard_block_rewards::compute_beacon_block_rewards(chain, block_id)?;
Ok(rewards)
.map(api_types::GenericResponse::from)
.map(|resp| resp.add_execution_optimistic(execution_optimistic))
})
});
/* /*
* beacon/rewards * beacon/rewards
*/ */
@ -1815,6 +1837,58 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("rewards")) .and(warp::path("rewards"))
.and(chain_filter.clone()); .and(chain_filter.clone());
// POST beacon/rewards/attestations/{epoch}
let post_beacon_rewards_attestations = beacon_rewards_path
.clone()
.and(warp::path("attestations"))
.and(warp::path::param::<Epoch>())
.and(warp::path::end())
.and(warp::body::json())
.and(log_filter.clone())
.and_then(
|chain: Arc<BeaconChain<T>>,
epoch: Epoch,
validators: Vec<ValidatorId>,
log: Logger| {
blocking_json_task(move || {
let attestation_rewards = chain
.compute_attestation_rewards(epoch, validators, log)
.map_err(|e| match e {
BeaconChainError::MissingBeaconState(root) => {
warp_utils::reject::custom_not_found(format!(
"missing state {root:?}",
))
}
BeaconChainError::NoStateForSlot(slot) => {
warp_utils::reject::custom_not_found(format!(
"missing state at slot {slot}"
))
}
BeaconChainError::BeaconStateError(
BeaconStateError::UnknownValidator(validator_index),
) => warp_utils::reject::custom_bad_request(format!(
"validator is unknown: {validator_index}"
)),
BeaconChainError::ValidatorPubkeyUnknown(pubkey) => {
warp_utils::reject::custom_bad_request(format!(
"validator pubkey is unknown: {pubkey:?}"
))
}
e => warp_utils::reject::custom_server_error(format!(
"unexpected error: {:?}",
e
)),
})?;
let execution_optimistic =
chain.is_optimistic_or_invalid_head().unwrap_or_default();
Ok(attestation_rewards)
.map(api_types::GenericResponse::from)
.map(|resp| resp.add_execution_optimistic(execution_optimistic))
})
},
);
// POST beacon/rewards/sync_committee/{block_id} // POST beacon/rewards/sync_committee/{block_id}
let post_beacon_rewards_sync_committee = beacon_rewards_path let post_beacon_rewards_sync_committee = beacon_rewards_path
.clone() .clone()
@ -2887,7 +2961,7 @@ pub fn serve<T: BeaconChainTypes>(
.await .await
.map(|resp| warp::reply::json(&resp)) .map(|resp| warp::reply::json(&resp))
.map_err(|e| { .map_err(|e| {
error!( warn!(
log, log,
"Relay error when registering validator(s)"; "Relay error when registering validator(s)";
"num_registrations" => filtered_registration_data.len(), "num_registrations" => filtered_registration_data.len(),
@ -3488,6 +3562,7 @@ pub fn serve<T: BeaconChainTypes>(
.or(get_beacon_pool_voluntary_exits.boxed()) .or(get_beacon_pool_voluntary_exits.boxed())
.or(get_beacon_pool_bls_to_execution_changes.boxed()) .or(get_beacon_pool_bls_to_execution_changes.boxed())
.or(get_beacon_deposit_snapshot.boxed()) .or(get_beacon_deposit_snapshot.boxed())
.or(get_beacon_rewards_blocks.boxed())
.or(get_config_fork_schedule.boxed()) .or(get_config_fork_schedule.boxed())
.or(get_config_spec.boxed()) .or(get_config_spec.boxed())
.or(get_config_deposit_contract.boxed()) .or(get_config_deposit_contract.boxed())
@ -3540,6 +3615,7 @@ pub fn serve<T: BeaconChainTypes>(
.or(post_beacon_pool_voluntary_exits.boxed()) .or(post_beacon_pool_voluntary_exits.boxed())
.or(post_beacon_pool_sync_committees.boxed()) .or(post_beacon_pool_sync_committees.boxed())
.or(post_beacon_pool_bls_to_execution_changes.boxed()) .or(post_beacon_pool_bls_to_execution_changes.boxed())
.or(post_beacon_rewards_attestations.boxed())
.or(post_beacon_rewards_sync_committee.boxed()) .or(post_beacon_rewards_sync_committee.boxed())
.or(post_validator_duties_attester.boxed()) .or(post_validator_duties_attester.boxed())
.or(post_validator_duties_sync.boxed()) .or(post_validator_duties_sync.boxed())

View File

@ -0,0 +1,27 @@
use crate::sync_committee_rewards::get_state_before_applying_block;
use crate::BlockId;
use crate::ExecutionOptimistic;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2::lighthouse::StandardBlockReward;
use std::sync::Arc;
use warp_utils::reject::beacon_chain_error;
//// The difference between block_rewards and beacon_block_rewards is the later returns block
//// reward format that satisfies beacon-api specs
pub fn compute_beacon_block_rewards<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_id: BlockId,
) -> Result<(StandardBlockReward, ExecutionOptimistic), warp::Rejection> {
let (block, execution_optimistic) = block_id.blinded_block(&chain)?;
let block_ref = block.message();
let block_root = block.canonical_root();
let mut state = get_state_before_applying_block(chain.clone(), &block)?;
let rewards = chain
.compute_beacon_block_reward(block_ref, block_root, &mut state)
.map_err(beacon_chain_error)?;
Ok((rewards, execution_optimistic))
}

View File

@ -47,7 +47,7 @@ pub fn compute_sync_committee_rewards<T: BeaconChainTypes>(
Ok((data, execution_optimistic)) Ok((data, execution_optimistic))
} }
fn get_state_before_applying_block<T: BeaconChainTypes>( pub fn get_state_before_applying_block<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
block: &SignedBlindedBeaconBlock<T::EthSpec>, block: &SignedBlindedBeaconBlock<T::EthSpec>,
) -> Result<BeaconState<T::EthSpec>, warp::reject::Rejection> { ) -> Result<BeaconState<T::EthSpec>, warp::reject::Rejection> {

View File

@ -1,3 +1,4 @@
use crate::rpc::config::OutboundRateLimiterConfig;
use crate::types::GossipKind; use crate::types::GossipKind;
use crate::{Enr, PeerIdSerialized}; use crate::{Enr, PeerIdSerialized};
use directory::{ use directory::{
@ -133,6 +134,9 @@ pub struct Config {
/// Whether light client protocols should be enabled. /// Whether light client protocols should be enabled.
pub enable_light_client_server: bool, pub enable_light_client_server: bool,
/// Configuration for the outbound rate limiter (requests made by this node).
pub outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
} }
impl Default for Config { impl Default for Config {
@ -211,6 +215,7 @@ impl Default for Config {
topics: Vec::new(), topics: Vec::new(),
metrics_enabled: false, metrics_enabled: false,
enable_light_client_server: false, enable_light_client_server: false,
outbound_rate_limiter_config: None,
} }
} }
} }

View File

@ -0,0 +1,173 @@
use std::{
fmt::{Debug, Display},
str::FromStr,
time::Duration,
};
use super::{methods, rate_limiter::Quota, Protocol};
use serde_derive::{Deserialize, Serialize};
/// Auxiliary struct to aid on configuration parsing.
///
/// A protocol's quota is specified as `protocol_name:tokens/time_in_seconds`.
#[derive(Debug, PartialEq, Eq)]
struct ProtocolQuota {
protocol: Protocol,
quota: Quota,
}
impl Display for ProtocolQuota {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}:{}/{}",
self.protocol.as_ref(),
self.quota.max_tokens,
self.quota.replenish_all_every.as_secs()
)
}
}
impl FromStr for ProtocolQuota {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (protocol_str, quota_str) = s
.split_once(':')
.ok_or("Missing ':' from quota definition.")?;
let protocol = protocol_str
.parse()
.map_err(|_parse_err| "Wrong protocol representation in quota")?;
let (tokens_str, time_str) = quota_str
.split_once('/')
.ok_or("Quota should be defined as \"n/t\" (t in seconds). Missing '/' from quota.")?;
let tokens = tokens_str
.parse()
.map_err(|_| "Failed to parse tokens from quota.")?;
let seconds = time_str
.parse::<u64>()
.map_err(|_| "Failed to parse time in seconds from quota.")?;
Ok(ProtocolQuota {
protocol,
quota: Quota {
replenish_all_every: Duration::from_secs(seconds),
max_tokens: tokens,
},
})
}
}
/// Configurations for the rate limiter applied to outbound requests (made by the node itself).
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct OutboundRateLimiterConfig {
pub(super) ping_quota: Quota,
pub(super) meta_data_quota: Quota,
pub(super) status_quota: Quota,
pub(super) goodbye_quota: Quota,
pub(super) blocks_by_range_quota: Quota,
pub(super) blocks_by_root_quota: Quota,
}
impl OutboundRateLimiterConfig {
pub const DEFAULT_PING_QUOTA: Quota = Quota::n_every(2, 10);
pub const DEFAULT_META_DATA_QUOTA: Quota = Quota::n_every(2, 5);
pub const DEFAULT_STATUS_QUOTA: Quota = Quota::n_every(5, 15);
pub const DEFAULT_GOODBYE_QUOTA: Quota = Quota::one_every(10);
pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota =
Quota::n_every(methods::MAX_REQUEST_BLOCKS, 10);
pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
}
impl Default for OutboundRateLimiterConfig {
fn default() -> Self {
OutboundRateLimiterConfig {
ping_quota: Self::DEFAULT_PING_QUOTA,
meta_data_quota: Self::DEFAULT_META_DATA_QUOTA,
status_quota: Self::DEFAULT_STATUS_QUOTA,
goodbye_quota: Self::DEFAULT_GOODBYE_QUOTA,
blocks_by_range_quota: Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA,
blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA,
}
}
}
impl Debug for OutboundRateLimiterConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
macro_rules! fmt_q {
($quota:expr) => {
&format_args!(
"{}/{}s",
$quota.max_tokens,
$quota.replenish_all_every.as_secs()
)
};
}
f.debug_struct("OutboundRateLimiterConfig")
.field("ping", fmt_q!(&self.ping_quota))
.field("metadata", fmt_q!(&self.meta_data_quota))
.field("status", fmt_q!(&self.status_quota))
.field("goodbye", fmt_q!(&self.goodbye_quota))
.field("blocks_by_range", fmt_q!(&self.blocks_by_range_quota))
.field("blocks_by_root", fmt_q!(&self.blocks_by_root_quota))
.finish()
}
}
/// Parse configurations for the outbound rate limiter. Protocols that are not specified use
/// the default values. Protocol specified more than once use only the first given Quota.
///
/// The expected format is a ';' separated list of [`ProtocolQuota`].
impl FromStr for OutboundRateLimiterConfig {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut ping_quota = None;
let mut meta_data_quota = None;
let mut status_quota = None;
let mut goodbye_quota = None;
let mut blocks_by_range_quota = None;
let mut blocks_by_root_quota = None;
for proto_def in s.split(';') {
let ProtocolQuota { protocol, quota } = proto_def.parse()?;
let quota = Some(quota);
match protocol {
Protocol::Status => status_quota = status_quota.or(quota),
Protocol::Goodbye => goodbye_quota = goodbye_quota.or(quota),
Protocol::BlocksByRange => blocks_by_range_quota = blocks_by_range_quota.or(quota),
Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota),
Protocol::Ping => ping_quota = ping_quota.or(quota),
Protocol::MetaData => meta_data_quota = meta_data_quota.or(quota),
Protocol::LightClientBootstrap => return Err("Lighthouse does not send LightClientBootstrap requests. Quota should not be set."),
}
}
Ok(OutboundRateLimiterConfig {
ping_quota: ping_quota.unwrap_or(Self::DEFAULT_PING_QUOTA),
meta_data_quota: meta_data_quota.unwrap_or(Self::DEFAULT_META_DATA_QUOTA),
status_quota: status_quota.unwrap_or(Self::DEFAULT_STATUS_QUOTA),
goodbye_quota: goodbye_quota.unwrap_or(Self::DEFAULT_GOODBYE_QUOTA),
blocks_by_range_quota: blocks_by_range_quota
.unwrap_or(Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA),
blocks_by_root_quota: blocks_by_root_quota
.unwrap_or(Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_quota_inverse() {
let quota = ProtocolQuota {
protocol: Protocol::Goodbye,
quota: Quota {
replenish_all_every: Duration::from_secs(10),
max_tokens: 8,
},
};
assert_eq!(quota.to_string().parse(), Ok(quota))
}
}

View File

@ -12,7 +12,7 @@ use libp2p::swarm::{
PollParameters, SubstreamProtocol, PollParameters, SubstreamProtocol,
}; };
use libp2p::PeerId; use libp2p::PeerId;
use rate_limiter::{RPCRateLimiter as RateLimiter, RPCRateLimiterBuilder, RateLimitedErr}; use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
use slog::{crit, debug, o}; use slog::{crit, debug, o};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
@ -33,12 +33,17 @@ pub use methods::{
pub(crate) use outbound::OutboundRequest; pub(crate) use outbound::OutboundRequest;
pub use protocol::{max_rpc_size, Protocol, RPCError}; pub use protocol::{max_rpc_size, Protocol, RPCError};
use self::config::OutboundRateLimiterConfig;
use self::self_limiter::SelfRateLimiter;
pub(crate) mod codec; pub(crate) mod codec;
pub mod config;
mod handler; mod handler;
pub mod methods; pub mod methods;
mod outbound; mod outbound;
mod protocol; mod protocol;
mod rate_limiter; mod rate_limiter;
mod self_limiter;
/// Composite trait for a request id. /// Composite trait for a request id.
pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {} pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {}
@ -101,13 +106,18 @@ pub struct RPCMessage<Id, TSpec: EthSpec> {
pub event: HandlerEvent<Id, TSpec>, pub event: HandlerEvent<Id, TSpec>,
} }
type BehaviourAction<Id, TSpec> =
NetworkBehaviourAction<RPCMessage<Id, TSpec>, RPCHandler<Id, TSpec>>;
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
/// logic. /// logic.
pub struct RPC<Id: ReqId, TSpec: EthSpec> { pub struct RPC<Id: ReqId, TSpec: EthSpec> {
/// Rate limiter /// Rate limiter
limiter: RateLimiter, limiter: RateLimiter,
/// Rate limiter for our own requests.
self_limiter: Option<SelfRateLimiter<Id, TSpec>>,
/// Queue of events to be processed. /// Queue of events to be processed.
events: Vec<NetworkBehaviourAction<RPCMessage<Id, TSpec>, RPCHandler<Id, TSpec>>>, events: Vec<BehaviourAction<Id, TSpec>>,
fork_context: Arc<ForkContext>, fork_context: Arc<ForkContext>,
enable_light_client_server: bool, enable_light_client_server: bool,
/// Slog logger for RPC behaviour. /// Slog logger for RPC behaviour.
@ -118,10 +128,12 @@ impl<Id: ReqId, TSpec: EthSpec> RPC<Id, TSpec> {
pub fn new( pub fn new(
fork_context: Arc<ForkContext>, fork_context: Arc<ForkContext>,
enable_light_client_server: bool, enable_light_client_server: bool,
outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
log: slog::Logger, log: slog::Logger,
) -> Self { ) -> Self {
let log = log.new(o!("service" => "libp2p_rpc")); let log = log.new(o!("service" => "libp2p_rpc"));
let limiter = RPCRateLimiterBuilder::new()
let limiter = RateLimiter::builder()
.n_every(Protocol::MetaData, 2, Duration::from_secs(5)) .n_every(Protocol::MetaData, 2, Duration::from_secs(5))
.n_every(Protocol::Ping, 2, Duration::from_secs(10)) .n_every(Protocol::Ping, 2, Duration::from_secs(10))
.n_every(Protocol::Status, 5, Duration::from_secs(15)) .n_every(Protocol::Status, 5, Duration::from_secs(15))
@ -140,8 +152,14 @@ impl<Id: ReqId, TSpec: EthSpec> RPC<Id, TSpec> {
) )
.build() .build()
.expect("Configuration parameters are valid"); .expect("Configuration parameters are valid");
let self_limiter = outbound_rate_limiter_config.map(|config| {
SelfRateLimiter::new(config, log.clone()).expect("Configuration parameters are valid")
});
RPC { RPC {
limiter, limiter,
self_limiter,
events: Vec::new(), events: Vec::new(),
fork_context, fork_context,
enable_light_client_server, enable_light_client_server,
@ -168,12 +186,24 @@ impl<Id: ReqId, TSpec: EthSpec> RPC<Id, TSpec> {
/// Submits an RPC request. /// Submits an RPC request.
/// ///
/// The peer must be connected for this to succeed. /// The peer must be connected for this to succeed.
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, event: OutboundRequest<TSpec>) { pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: OutboundRequest<TSpec>) {
self.events.push(NetworkBehaviourAction::NotifyHandler { let event = if let Some(self_limiter) = self.self_limiter.as_mut() {
match self_limiter.allows(peer_id, request_id, req) {
Ok(event) => event,
Err(_e) => {
// Request is logged and queued internally in the self rate limiter.
return;
}
}
} else {
NetworkBehaviourAction::NotifyHandler {
peer_id, peer_id,
handler: NotifyHandler::Any, handler: NotifyHandler::Any,
event: RPCSend::Request(request_id, event), event: RPCSend::Request(request_id, req),
}); }
};
self.events.push(event);
} }
/// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This /// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This
@ -278,11 +308,19 @@ where
cx: &mut Context, cx: &mut Context,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> { ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
// let the rate limiter prune // let the rate limiter prune.
let _ = self.limiter.poll_unpin(cx); let _ = self.limiter.poll_unpin(cx);
if let Some(self_limiter) = self.self_limiter.as_mut() {
if let Poll::Ready(event) = self_limiter.poll_ready(cx) {
self.events.push(event)
}
}
if !self.events.is_empty() { if !self.events.is_empty() {
return Poll::Ready(self.events.remove(0)); return Poll::Ready(self.events.remove(0));
} }
Poll::Pending Poll::Pending
} }
} }

View File

@ -14,7 +14,7 @@ use std::io;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use strum::IntoStaticStr; use strum::{AsRefStr, Display, EnumString, IntoStaticStr};
use tokio_io_timeout::TimeoutStream; use tokio_io_timeout::TimeoutStream;
use tokio_util::{ use tokio_util::{
codec::Framed, codec::Framed,
@ -169,23 +169,28 @@ pub fn rpc_block_limits_by_fork(current_fork: ForkName) -> RpcLimits {
} }
/// Protocol names to be used. /// Protocol names to be used.
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumString, AsRefStr, Display)]
#[strum(serialize_all = "snake_case")]
pub enum Protocol { pub enum Protocol {
/// The Status protocol name. /// The Status protocol name.
Status, Status,
/// The Goodbye protocol name. /// The Goodbye protocol name.
Goodbye, Goodbye,
/// The `BlocksByRange` protocol name. /// The `BlocksByRange` protocol name.
#[strum(serialize = "beacon_blocks_by_range")]
BlocksByRange, BlocksByRange,
/// The `BlocksByRoot` protocol name. /// The `BlocksByRoot` protocol name.
#[strum(serialize = "beacon_blocks_by_root")]
BlocksByRoot, BlocksByRoot,
/// The `BlobsByRange` protocol name. /// The `BlobsByRange` protocol name.
BlobsByRange, BlobsByRange,
/// The `Ping` protocol name. /// The `Ping` protocol name.
Ping, Ping,
/// The `MetaData` protocol name. /// The `MetaData` protocol name.
#[strum(serialize = "metadata")]
MetaData, MetaData,
/// The `LightClientBootstrap` protocol name. /// The `LightClientBootstrap` protocol name.
#[strum(serialize = "light_client_bootstrap")]
LightClientBootstrap, LightClientBootstrap,
} }
@ -204,22 +209,6 @@ pub enum Encoding {
SSZSnappy, SSZSnappy,
} }
impl std::fmt::Display for Protocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let repr = match self {
Protocol::Status => "status",
Protocol::Goodbye => "goodbye",
Protocol::BlocksByRange => "beacon_blocks_by_range",
Protocol::BlocksByRoot => "beacon_blocks_by_root",
Protocol::BlobsByRange => "blobs_sidecars_by_range",
Protocol::Ping => "ping",
Protocol::MetaData => "metadata",
Protocol::LightClientBootstrap => "light_client_bootstrap",
};
f.write_str(repr)
}
}
impl std::fmt::Display for Encoding { impl std::fmt::Display for Encoding {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let repr = match self { let repr = match self {

View File

@ -1,6 +1,7 @@
use crate::rpc::{InboundRequest, Protocol}; use crate::rpc::Protocol;
use fnv::FnvHashMap; use fnv::FnvHashMap;
use libp2p::PeerId; use libp2p::PeerId;
use serde_derive::{Deserialize, Serialize};
use std::convert::TryInto; use std::convert::TryInto;
use std::future::Future; use std::future::Future;
use std::hash::Hash; use std::hash::Hash;
@ -47,12 +48,31 @@ type Nanosecs = u64;
/// n*`replenish_all_every`/`max_tokens` units of time since their last request. /// n*`replenish_all_every`/`max_tokens` units of time since their last request.
/// ///
/// To produce hard limits, set `max_tokens` to 1. /// To produce hard limits, set `max_tokens` to 1.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Quota { pub struct Quota {
/// How often are `max_tokens` fully replenished. /// How often are `max_tokens` fully replenished.
replenish_all_every: Duration, pub(super) replenish_all_every: Duration,
/// Token limit. This translates on how large can an instantaneous batch of /// Token limit. This translates on how large can an instantaneous batch of
/// tokens be. /// tokens be.
max_tokens: u64, pub(super) max_tokens: u64,
}
impl Quota {
/// A hard limit of one token every `seconds`.
pub const fn one_every(seconds: u64) -> Self {
Quota {
replenish_all_every: Duration::from_secs(seconds),
max_tokens: 1,
}
}
/// Allow `n` tokens to be use used every `seconds`.
pub const fn n_every(n: u64, seconds: u64) -> Self {
Quota {
replenish_all_every: Duration::from_secs(seconds),
max_tokens: n,
}
}
} }
/// Manages rate limiting of requests per peer, with differentiated rates per protocol. /// Manages rate limiting of requests per peer, with differentiated rates per protocol.
@ -80,6 +100,7 @@ pub struct RPCRateLimiter {
} }
/// Error type for non conformant requests /// Error type for non conformant requests
#[derive(Debug)]
pub enum RateLimitedErr { pub enum RateLimitedErr {
/// Required tokens for this request exceed the maximum /// Required tokens for this request exceed the maximum
TooLarge, TooLarge,
@ -88,7 +109,7 @@ pub enum RateLimitedErr {
} }
/// User-friendly builder of a `RPCRateLimiter` /// User-friendly builder of a `RPCRateLimiter`
#[derive(Default)] #[derive(Default, Clone)]
pub struct RPCRateLimiterBuilder { pub struct RPCRateLimiterBuilder {
/// Quota for the Goodbye protocol. /// Quota for the Goodbye protocol.
goodbye_quota: Option<Quota>, goodbye_quota: Option<Quota>,
@ -109,13 +130,8 @@ pub struct RPCRateLimiterBuilder {
} }
impl RPCRateLimiterBuilder { impl RPCRateLimiterBuilder {
/// Get an empty `RPCRateLimiterBuilder`.
pub fn new() -> Self {
Default::default()
}
/// Set a quota for a protocol. /// Set a quota for a protocol.
fn set_quota(mut self, protocol: Protocol, quota: Quota) -> Self { pub fn set_quota(mut self, protocol: Protocol, quota: Quota) -> Self {
let q = Some(quota); let q = Some(quota);
match protocol { match protocol {
Protocol::Ping => self.ping_quota = q, Protocol::Ping => self.ping_quota = q,
@ -202,11 +218,40 @@ impl RPCRateLimiterBuilder {
} }
} }
pub trait RateLimiterItem {
fn protocol(&self) -> Protocol;
fn expected_responses(&self) -> u64;
}
impl<T: EthSpec> RateLimiterItem for super::InboundRequest<T> {
fn protocol(&self) -> Protocol {
self.protocol()
}
fn expected_responses(&self) -> u64 {
self.expected_responses()
}
}
impl<T: EthSpec> RateLimiterItem for super::OutboundRequest<T> {
fn protocol(&self) -> Protocol {
self.protocol()
}
fn expected_responses(&self) -> u64 {
self.expected_responses()
}
}
impl RPCRateLimiter { impl RPCRateLimiter {
pub fn allows<T: EthSpec>( /// Get a builder instance.
pub fn builder() -> RPCRateLimiterBuilder {
RPCRateLimiterBuilder::default()
}
pub fn allows<Item: RateLimiterItem>(
&mut self, &mut self,
peer_id: &PeerId, peer_id: &PeerId,
request: &InboundRequest<T>, request: &Item,
) -> Result<(), RateLimitedErr> { ) -> Result<(), RateLimitedErr> {
let time_since_start = self.init_time.elapsed(); let time_since_start = self.init_time.elapsed();
let tokens = request.expected_responses().max(1); let tokens = request.expected_responses().max(1);

View File

@ -0,0 +1,202 @@
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
task::{Context, Poll},
time::Duration,
};
use futures::FutureExt;
use libp2p::{swarm::NotifyHandler, PeerId};
use slog::{crit, debug, Logger};
use smallvec::SmallVec;
use tokio_util::time::DelayQueue;
use types::EthSpec;
use super::{
config::OutboundRateLimiterConfig,
rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
BehaviourAction, OutboundRequest, Protocol, RPCSend, ReqId,
};
/// A request that was rate limited or waiting on rate limited requests for the same peer and
/// protocol.
struct QueuedRequest<Id: ReqId, TSpec: EthSpec> {
req: OutboundRequest<TSpec>,
request_id: Id,
}
pub(crate) struct SelfRateLimiter<Id: ReqId, TSpec: EthSpec> {
/// Requests queued for sending per peer. This requests are stored when the self rate
/// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore
/// are stored in the same way.
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id, TSpec>>>,
/// The delay required to allow a peer's outbound request per protocol.
next_peer_request: DelayQueue<(PeerId, Protocol)>,
/// Rate limiter for our own requests.
limiter: RateLimiter,
/// Requests that are ready to be sent.
ready_requests: SmallVec<[BehaviourAction<Id, TSpec>; 3]>,
/// Slog logger.
log: Logger,
}
/// Error returned when the rate limiter does not accept a request.
// NOTE: this is currently not used, but might be useful for debugging.
pub enum Error {
/// There are queued requests for this same peer and protocol.
PendingRequests,
/// Request was tried but rate limited.
RateLimited,
}
impl<Id: ReqId, TSpec: EthSpec> SelfRateLimiter<Id, TSpec> {
/// Creates a new [`SelfRateLimiter`] based on configration values.
pub fn new(config: OutboundRateLimiterConfig, log: Logger) -> Result<Self, &'static str> {
debug!(log, "Using self rate limiting params"; "config" => ?config);
// Destructure to make sure every configuration value is used.
let OutboundRateLimiterConfig {
ping_quota,
meta_data_quota,
status_quota,
goodbye_quota,
blocks_by_range_quota,
blocks_by_root_quota,
} = config;
let limiter = RateLimiter::builder()
.set_quota(Protocol::Ping, ping_quota)
.set_quota(Protocol::MetaData, meta_data_quota)
.set_quota(Protocol::Status, status_quota)
.set_quota(Protocol::Goodbye, goodbye_quota)
.set_quota(Protocol::BlocksByRange, blocks_by_range_quota)
.set_quota(Protocol::BlocksByRoot, blocks_by_root_quota)
// Manually set the LightClientBootstrap quota, since we use the same rate limiter for
// inbound and outbound requests, and the LightClientBootstrap is an only inbound
// protocol.
.one_every(Protocol::LightClientBootstrap, Duration::from_secs(10))
.build()?;
Ok(SelfRateLimiter {
delayed_requests: Default::default(),
next_peer_request: Default::default(),
limiter,
ready_requests: Default::default(),
log,
})
}
/// Checks if the rate limiter allows the request. If it's allowed, returns the
/// [`NetworkBehaviourAction`] that should be emitted. When not allowed, the request is delayed
/// until it can be sent.
pub fn allows(
&mut self,
peer_id: PeerId,
request_id: Id,
req: OutboundRequest<TSpec>,
) -> Result<BehaviourAction<Id, TSpec>, Error> {
let protocol = req.protocol();
// First check that there are not already other requests waiting to be sent.
if let Some(queued_requests) = self.delayed_requests.get_mut(&(peer_id, protocol)) {
queued_requests.push_back(QueuedRequest { req, request_id });
return Err(Error::PendingRequests);
}
match Self::try_send_request(&mut self.limiter, peer_id, request_id, req, &self.log) {
Err((rate_limited_req, wait_time)) => {
let key = (peer_id, protocol);
self.next_peer_request.insert(key, wait_time);
self.delayed_requests
.entry(key)
.or_default()
.push_back(rate_limited_req);
Err(Error::RateLimited)
}
Ok(event) => Ok(event),
}
}
/// Auxiliary function to deal with self rate limiting outcomes. If the rate limiter allows the
/// request, the [`NetworkBehaviourAction`] that should be emitted is returned. If the request
/// should be delayed, it's returned with the duration to wait.
fn try_send_request(
limiter: &mut RateLimiter,
peer_id: PeerId,
request_id: Id,
req: OutboundRequest<TSpec>,
log: &Logger,
) -> Result<BehaviourAction<Id, TSpec>, (QueuedRequest<Id, TSpec>, Duration)> {
match limiter.allows(&peer_id, &req) {
Ok(()) => Ok(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: RPCSend::Request(request_id, req),
}),
Err(e) => {
let protocol = req.protocol();
match e {
RateLimitedErr::TooLarge => {
// this should never happen with default parameters. Let's just send the request.
// Log a crit since this is a config issue.
crit!(
log,
"Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters.";
"protocol" => %req.protocol()
);
Ok(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: RPCSend::Request(request_id, req),
})
}
RateLimitedErr::TooSoon(wait_time) => {
debug!(log, "Self rate limiting"; "protocol" => %protocol, "wait_time_ms" => wait_time.as_millis(), "peer_id" => %peer_id);
Err((QueuedRequest { req, request_id }, wait_time))
}
}
}
}
}
/// When a peer and protocol are allowed to send a next request, this function checks the
/// queued requests and attempts marking as ready as many as the limiter allows.
fn next_peer_request_ready(&mut self, peer_id: PeerId, protocol: Protocol) {
if let Entry::Occupied(mut entry) = self.delayed_requests.entry((peer_id, protocol)) {
let queued_requests = entry.get_mut();
while let Some(QueuedRequest { req, request_id }) = queued_requests.pop_front() {
match Self::try_send_request(&mut self.limiter, peer_id, request_id, req, &self.log)
{
Err((rate_limited_req, wait_time)) => {
let key = (peer_id, protocol);
self.next_peer_request.insert(key, wait_time);
queued_requests.push_back(rate_limited_req);
// If one fails just wait for the next window that allows sending requests.
return;
}
Ok(event) => self.ready_requests.push(event),
}
}
if queued_requests.is_empty() {
entry.remove();
}
}
}
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<BehaviourAction<Id, TSpec>> {
// First check the requests that were self rate limited, since those might add events to
// the queue. Also do this this before rate limiter prunning to avoid removing and
// immediately adding rate limiting keys.
if let Poll::Ready(Some(Ok(expired))) = self.next_peer_request.poll_expired(cx) {
let (peer_id, protocol) = expired.into_inner();
self.next_peer_request_ready(peer_id, protocol);
}
// Prune the rate limiter.
let _ = self.limiter.poll_unpin(cx);
// Finally return any queued events.
if !self.ready_requests.is_empty() {
return Poll::Ready(self.ready_requests.remove(0));
}
Poll::Pending
}
}

View File

@ -264,6 +264,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
let eth2_rpc = RPC::new( let eth2_rpc = RPC::new(
ctx.fork_context.clone(), ctx.fork_context.clone(),
config.enable_light_client_server, config.enable_light_client_server,
config.outbound_rate_limiter_config.clone(),
log.clone(), log.clone(),
); );

View File

@ -46,6 +46,7 @@ derivative = "2.2.0"
delay_map = "0.1.1" delay_map = "0.1.1"
ethereum-types = { version = "0.14.1", optional = true } ethereum-types = { version = "0.14.1", optional = true }
operation_pool = { path = "../operation_pool" } operation_pool = { path = "../operation_pool" }
execution_layer = { path = "../execution_layer" }
[features] [features]
deterministic_long_lived_attnets = [ "ethereum-types" ] deterministic_long_lived_attnets = [ "ethereum-types" ]

View File

@ -7,7 +7,7 @@ use itertools::process_results;
use lighthouse_network::rpc::StatusMessage; use lighthouse_network::rpc::StatusMessage;
use lighthouse_network::rpc::*; use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
use slog::{debug, error}; use slog::{debug, error, warn};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
@ -392,12 +392,26 @@ impl<T: BeaconChainTypes> Worker<T> {
break; break;
} }
Err(e) => { Err(e) => {
if matches!(
e,
BeaconChainError::ExecutionLayerErrorPayloadReconstruction(_block_hash, ref boxed_error)
if matches!(**boxed_error, execution_layer::Error::EngineError(_))
) {
warn!(
self.log,
"Error rebuilding payload for peer";
"info" => "this may occur occasionally when the EE is busy",
"block_root" => ?root,
"error" => ?e,
);
} else {
error!( error!(
self.log, self.log,
"Error fetching block for peer"; "Error fetching block for peer";
"block_root" => ?root, "block_root" => ?root,
"error" => ?e "error" => ?e
); );
}
// send the stream terminator // send the stream terminator
self.send_error_response( self.send_error_response(

View File

@ -10,7 +10,7 @@ mod reward_cache;
mod sync_aggregate_id; mod sync_aggregate_id;
pub use crate::bls_to_execution_changes::ReceivedPreCapella; pub use crate::bls_to_execution_changes::ReceivedPreCapella;
pub use attestation::AttMaxCover; pub use attestation::{earliest_attestation_validators, AttMaxCover};
pub use attestation_storage::{AttestationRef, SplitAttestation}; pub use attestation_storage::{AttestationRef, SplitAttestation};
pub use max_cover::MaxCover; pub use max_cover::MaxCover;
pub use persistence::{ pub use persistence::{

View File

@ -194,6 +194,21 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.help("Lighthouse by default does not discover private IP addresses. Set this flag to enable connection attempts to local addresses.") .help("Lighthouse by default does not discover private IP addresses. Set this flag to enable connection attempts to local addresses.")
.takes_value(false), .takes_value(false),
) )
.arg(
Arg::with_name("self-limiter")
.long("self-limiter")
.help(
"Enables the outbound rate limiter (requests made by this node).\
\
Rate limit quotas per protocol can be set in the form of \
<protocol_name>:<tokens>/<time_in_seconds>. To set quotas for multiple protocols, \
separate them by ';'. If the self rate limiter is enabled and a protocol is not \
present in the configuration, the quotas used for the inbound rate limiter will be \
used."
)
.min_values(0)
.hidden(true)
)
/* REST API related arguments */ /* REST API related arguments */
.arg( .arg(
Arg::with_name("http") Arg::with_name("http")

View File

@ -967,6 +967,13 @@ pub fn set_network_config(
// Light client server config. // Light client server config.
config.enable_light_client_server = cli_args.is_present("light-client-server"); config.enable_light_client_server = cli_args.is_present("light-client-server");
// This flag can be used both with or without a value. Try to parse it first with a value, if
// no value is defined but the flag is present, use the default params.
config.outbound_rate_limiter_config = clap_utils::parse_optional(cli_args, "self-limiter")?;
if cli_args.is_present("self-limiter") && config.outbound_rate_limiter_config.is_none() {
config.outbound_rate_limiter_config = Some(Default::default());
}
Ok(()) Ok(())
} }

View File

@ -789,6 +789,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_bytes()); let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_bytes());
key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
} }
StoreOp::KeyValueOp(kv_op) => {
key_value_batch.push(kv_op);
}
} }
} }
Ok(key_value_batch) Ok(key_value_batch)
@ -825,6 +829,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::DeleteState(_, _) => (), StoreOp::DeleteState(_, _) => (),
StoreOp::DeleteExecutionPayload(_) => (), StoreOp::DeleteExecutionPayload(_) => (),
StoreOp::KeyValueOp(_) => (),
} }
} }

View File

@ -162,6 +162,7 @@ pub enum StoreOp<'a, E: EthSpec> {
DeleteBlock(Hash256), DeleteBlock(Hash256),
DeleteState(Hash256, Option<Slot>), DeleteState(Hash256, Option<Slot>),
DeleteExecutionPayload(Hash256), DeleteExecutionPayload(Hash256),
KeyValueOp(KeyValueStoreOp),
} }
/// A unique column identifier. /// A unique column identifier.

View File

@ -57,7 +57,7 @@ $ docker pull sigp/lighthouse:latest-modern
Image tags follow this format: Image tags follow this format:
``` ```
${version}${arch}${stability}${modernity} ${version}${arch}${stability}${modernity}${features}
``` ```
The `version` is: The `version` is:
@ -81,6 +81,12 @@ The `modernity` is:
* `-modern` for optimized builds * `-modern` for optimized builds
* empty for a `portable` unoptimized build * empty for a `portable` unoptimized build
The `features` is:
* `-dev` for a development build with `minimal-spec` preset enabled.
* empty for a standard build with no custom feature enabled.
Examples: Examples:
* `latest-unstable-modern`: most recent `unstable` build for all modern CPUs (x86_64 or ARM) * `latest-unstable-modern`: most recent `unstable` build for all modern CPUs (x86_64 or ARM)

View File

@ -59,14 +59,7 @@ The following fields are returned:
- `previous_epoch_head_attesting_gwei`: the total staked gwei that attested to a - `previous_epoch_head_attesting_gwei`: the total staked gwei that attested to a
head beacon block that is in the canonical chain. head beacon block that is in the canonical chain.
From this data you can calculate some interesting figures: From this data you can calculate:
#### Participation Rate
`previous_epoch_attesting_gwei / previous_epoch_active_gwei`
Expresses the ratio of validators that managed to have an attestation
voting upon the previous epoch included in a block.
#### Justification/Finalization Rate #### Justification/Finalization Rate

View File

@ -1005,6 +1005,40 @@ impl BeaconNodeHttpClient {
Ok(()) Ok(())
} }
/// `GET beacon/rewards/blocks`
pub async fn get_beacon_rewards_blocks(&self, epoch: Epoch) -> Result<(), Error> {
let mut path = self.eth_path(V1)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("rewards")
.push("blocks");
path.query_pairs_mut()
.append_pair("epoch", &epoch.to_string());
self.get(path).await
}
/// `POST beacon/rewards/attestations`
pub async fn post_beacon_rewards_attestations(
&self,
attestations: &[ValidatorId],
) -> Result<(), Error> {
let mut path = self.eth_path(V1)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("rewards")
.push("attestations");
self.post(path, &attestations).await?;
Ok(())
}
/// `POST validator/contribution_and_proofs` /// `POST validator/contribution_and_proofs`
pub async fn post_validator_contribution_and_proofs<T: EthSpec>( pub async fn post_validator_contribution_and_proofs<T: EthSpec>(
&self, &self,

View File

@ -1,8 +1,10 @@
//! This module contains endpoints that are non-standard and only available on Lighthouse servers. //! This module contains endpoints that are non-standard and only available on Lighthouse servers.
mod attestation_performance; mod attestation_performance;
pub mod attestation_rewards;
mod block_packing_efficiency; mod block_packing_efficiency;
mod block_rewards; mod block_rewards;
mod standard_block_rewards;
mod sync_committee_rewards; mod sync_committee_rewards;
use crate::{ use crate::{
@ -23,11 +25,13 @@ use store::{AnchorInfo, Split, StoreConfig};
pub use attestation_performance::{ pub use attestation_performance::{
AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics,
}; };
pub use attestation_rewards::StandardAttestationRewards;
pub use block_packing_efficiency::{ pub use block_packing_efficiency::{
BlockPackingEfficiency, BlockPackingEfficiencyQuery, ProposerInfo, UniqueAttestation, BlockPackingEfficiency, BlockPackingEfficiencyQuery, ProposerInfo, UniqueAttestation,
}; };
pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery}; pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery};
pub use lighthouse_network::{types::SyncState, PeerInfo}; pub use lighthouse_network::{types::SyncState, PeerInfo};
pub use standard_block_rewards::StandardBlockReward;
pub use sync_committee_rewards::SyncCommitteeReward; pub use sync_committee_rewards::SyncCommitteeReward;
// Define "legacy" implementations of `Option<T>` which use four bytes for encoding the union // Define "legacy" implementations of `Option<T>` which use four bytes for encoding the union

View File

@ -0,0 +1,44 @@
use serde::{Deserialize, Serialize};
// Details about the rewards paid for attestations
// All rewards in GWei
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct IdealAttestationRewards {
// Validator's effective balance in gwei
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub effective_balance: u64,
// Ideal attester's reward for head vote in gwei
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub head: u64,
// Ideal attester's reward for target vote in gwei
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub target: u64,
// Ideal attester's reward for source vote in gwei
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub source: u64,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct TotalAttestationRewards {
// one entry for every validator based on their attestations in the epoch
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub validator_index: u64,
// attester's reward for head vote in gwei
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub head: u64,
// attester's reward for target vote in gwei
#[serde(with = "eth2_serde_utils::quoted_i64")]
pub target: i64,
// attester's reward for source vote in gwei
#[serde(with = "eth2_serde_utils::quoted_i64")]
pub source: i64,
// TBD attester's inclusion_delay reward in gwei (phase0 only)
// pub inclusion_delay: u64,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct StandardAttestationRewards {
pub ideal_rewards: Vec<IdealAttestationRewards>,
pub total_rewards: Vec<TotalAttestationRewards>,
}

View File

@ -0,0 +1,26 @@
use serde::{Deserialize, Serialize};
// Details about the rewards for a single block
// All rewards in GWei
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct StandardBlockReward {
// proposer of the block, the proposer index who receives these rewards
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub proposer_index: u64,
// total block reward in gwei,
// equal to attestations + sync_aggregate + proposer_slashings + attester_slashings
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub total: u64,
// block reward component due to included attestations in gwei
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub attestations: u64,
// block reward component due to included sync_aggregate in gwei
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub sync_aggregate: u64,
// block reward component due to included proposer_slashings in gwei
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub proposer_slashings: u64,
// block reward component due to included attester_slashings in gwei
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub attester_slashings: u64,
}

View File

@ -8,5 +8,6 @@ pub struct SyncCommitteeReward {
#[serde(with = "eth2_serde_utils::quoted_u64")] #[serde(with = "eth2_serde_utils::quoted_u64")]
pub validator_index: u64, pub validator_index: u64,
// sync committee reward in gwei for the validator // sync committee reward in gwei for the validator
#[serde(with = "eth2_serde_utils::quoted_i64")]
pub reward: i64, pub reward: i64,
} }

View File

@ -255,11 +255,20 @@ pub struct FinalityCheckpointsData {
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(try_from = "&str")]
pub enum ValidatorId { pub enum ValidatorId {
PublicKey(PublicKeyBytes), PublicKey(PublicKeyBytes),
Index(u64), Index(u64),
} }
impl TryFrom<&str> for ValidatorId {
type Error = String;
fn try_from(s: &str) -> Result<Self, Self::Error> {
Self::from_str(s)
}
}
impl FromStr for ValidatorId { impl FromStr for ValidatorId {
type Err = String; type Err = String;

View File

@ -722,7 +722,7 @@ where
op: &InvalidationOperation, op: &InvalidationOperation,
) -> Result<(), Error<T::Error>> { ) -> Result<(), Error<T::Error>> {
self.proto_array self.proto_array
.process_execution_payload_invalidation(op) .process_execution_payload_invalidation::<E>(op)
.map_err(Error::FailedToProcessInvalidExecutionPayload) .map_err(Error::FailedToProcessInvalidExecutionPayload)
} }
@ -1288,7 +1288,7 @@ where
if store.best_justified_checkpoint().epoch > store.justified_checkpoint().epoch { if store.best_justified_checkpoint().epoch > store.justified_checkpoint().epoch {
let store = &self.fc_store; let store = &self.fc_store;
if self.is_descendant_of_finalized(store.best_justified_checkpoint().root) { if self.is_finalized_checkpoint_or_descendant(store.best_justified_checkpoint().root) {
let store = &mut self.fc_store; let store = &mut self.fc_store;
store store
.set_justified_checkpoint(*store.best_justified_checkpoint()) .set_justified_checkpoint(*store.best_justified_checkpoint())
@ -1329,12 +1329,13 @@ where
/// Returns `true` if the block is known **and** a descendant of the finalized root. /// Returns `true` if the block is known **and** a descendant of the finalized root.
pub fn contains_block(&self, block_root: &Hash256) -> bool { pub fn contains_block(&self, block_root: &Hash256) -> bool {
self.proto_array.contains_block(block_root) && self.is_descendant_of_finalized(*block_root) self.proto_array.contains_block(block_root)
&& self.is_finalized_checkpoint_or_descendant(*block_root)
} }
/// Returns a `ProtoBlock` if the block is known **and** a descendant of the finalized root. /// Returns a `ProtoBlock` if the block is known **and** a descendant of the finalized root.
pub fn get_block(&self, block_root: &Hash256) -> Option<ProtoBlock> { pub fn get_block(&self, block_root: &Hash256) -> Option<ProtoBlock> {
if self.is_descendant_of_finalized(*block_root) { if self.is_finalized_checkpoint_or_descendant(*block_root) {
self.proto_array.get_block(block_root) self.proto_array.get_block(block_root)
} else { } else {
None None
@ -1343,7 +1344,7 @@ where
/// Returns an `ExecutionStatus` if the block is known **and** a descendant of the finalized root. /// Returns an `ExecutionStatus` if the block is known **and** a descendant of the finalized root.
pub fn get_block_execution_status(&self, block_root: &Hash256) -> Option<ExecutionStatus> { pub fn get_block_execution_status(&self, block_root: &Hash256) -> Option<ExecutionStatus> {
if self.is_descendant_of_finalized(*block_root) { if self.is_finalized_checkpoint_or_descendant(*block_root) {
self.proto_array.get_block_execution_status(block_root) self.proto_array.get_block_execution_status(block_root)
} else { } else {
None None
@ -1378,10 +1379,10 @@ where
}) })
} }
/// Return `true` if `block_root` is equal to the finalized root, or a known descendant of it. /// Return `true` if `block_root` is equal to the finalized checkpoint, or a known descendant of it.
pub fn is_descendant_of_finalized(&self, block_root: Hash256) -> bool { pub fn is_finalized_checkpoint_or_descendant(&self, block_root: Hash256) -> bool {
self.proto_array self.proto_array
.is_descendant(self.fc_store.finalized_checkpoint().root, block_root) .is_finalized_checkpoint_or_descendant::<E>(block_root)
} }
/// Returns `Ok(true)` if `block_root` has been imported optimistically or deemed invalid. /// Returns `Ok(true)` if `block_root` has been imported optimistically or deemed invalid.

View File

@ -273,7 +273,7 @@ impl ForkChoiceTestDefinition {
} }
}; };
fork_choice fork_choice
.process_execution_payload_invalidation(&op) .process_execution_payload_invalidation::<MainnetEthSpec>(&op)
.unwrap() .unwrap()
} }
Operation::AssertWeight { block_root, weight } => assert_eq!( Operation::AssertWeight { block_root, weight } => assert_eq!(

View File

@ -451,7 +451,7 @@ impl ProtoArray {
/// Invalidate zero or more blocks, as specified by the `InvalidationOperation`. /// Invalidate zero or more blocks, as specified by the `InvalidationOperation`.
/// ///
/// See the documentation of `InvalidationOperation` for usage. /// See the documentation of `InvalidationOperation` for usage.
pub fn propagate_execution_payload_invalidation( pub fn propagate_execution_payload_invalidation<E: EthSpec>(
&mut self, &mut self,
op: &InvalidationOperation, op: &InvalidationOperation,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -482,7 +482,7 @@ impl ProtoArray {
let latest_valid_ancestor_is_descendant = let latest_valid_ancestor_is_descendant =
latest_valid_ancestor_root.map_or(false, |ancestor_root| { latest_valid_ancestor_root.map_or(false, |ancestor_root| {
self.is_descendant(ancestor_root, head_block_root) self.is_descendant(ancestor_root, head_block_root)
&& self.is_descendant(self.finalized_checkpoint.root, ancestor_root) && self.is_finalized_checkpoint_or_descendant::<E>(ancestor_root)
}); });
// Collect all *ancestors* which were declared invalid since they reside between the // Collect all *ancestors* which were declared invalid since they reside between the
@ -977,6 +977,12 @@ impl ProtoArray {
/// ## Notes /// ## Notes
/// ///
/// Still returns `true` if `ancestor_root` is known and `ancestor_root == descendant_root`. /// Still returns `true` if `ancestor_root` is known and `ancestor_root == descendant_root`.
///
/// ## Warning
///
/// Do not use this function to check if a block is a descendant of the
/// finalized checkpoint. Use `Self::is_finalized_checkpoint_or_descendant`
/// instead.
pub fn is_descendant(&self, ancestor_root: Hash256, descendant_root: Hash256) -> bool { pub fn is_descendant(&self, ancestor_root: Hash256, descendant_root: Hash256) -> bool {
self.indices self.indices
.get(&ancestor_root) .get(&ancestor_root)
@ -990,6 +996,70 @@ impl ProtoArray {
.unwrap_or(false) .unwrap_or(false)
} }
/// Returns `true` if `root` is equal to or a descendant of
/// `self.finalized_checkpoint`.
///
/// Notably, this function is checking ancestory of the finalized
/// *checkpoint* not the finalized *block*.
pub fn is_finalized_checkpoint_or_descendant<E: EthSpec>(&self, root: Hash256) -> bool {
let finalized_root = self.finalized_checkpoint.root;
let finalized_slot = self
.finalized_checkpoint
.epoch
.start_slot(E::slots_per_epoch());
let mut node = if let Some(node) = self
.indices
.get(&root)
.and_then(|index| self.nodes.get(*index))
{
node
} else {
// An unknown root is not a finalized descendant. This line can only
// be reached if the user supplies a root that is not known to fork
// choice.
return false;
};
// The finalized and justified checkpoints represent a list of known
// ancestors of `node` that are likely to coincide with the store's
// finalized checkpoint.
//
// Run this check once, outside of the loop rather than inside the loop.
// If the conditions don't match for this node then they're unlikely to
// start matching for its ancestors.
for checkpoint in &[
node.finalized_checkpoint,
node.justified_checkpoint,
node.unrealized_finalized_checkpoint,
node.unrealized_justified_checkpoint,
] {
if checkpoint.map_or(false, |cp| cp == self.finalized_checkpoint) {
return true;
}
}
loop {
// If `node` is less than or equal to the finalized slot then `node`
// must be the finalized block.
if node.slot <= finalized_slot {
return node.root == finalized_root;
}
// Since `node` is from a higher slot that the finalized checkpoint,
// replace `node` with the parent of `node`.
if let Some(parent) = node.parent.and_then(|index| self.nodes.get(index)) {
node = parent
} else {
// If `node` is not the finalized block and its parent does not
// exist in fork choice, then the parent must have been pruned.
// Proto-array only prunes blocks prior to the finalized block,
// so this means the parent conflicts with finality.
return false;
};
}
}
/// Returns the first *beacon block root* which contains an execution payload with the given /// Returns the first *beacon block root* which contains an execution payload with the given
/// `block_hash`, if any. /// `block_hash`, if any.
pub fn execution_block_hash_to_beacon_block_root( pub fn execution_block_hash_to_beacon_block_root(

View File

@ -358,12 +358,12 @@ impl ProtoArrayForkChoice {
} }
/// See `ProtoArray::propagate_execution_payload_invalidation` for documentation. /// See `ProtoArray::propagate_execution_payload_invalidation` for documentation.
pub fn process_execution_payload_invalidation( pub fn process_execution_payload_invalidation<E: EthSpec>(
&mut self, &mut self,
op: &InvalidationOperation, op: &InvalidationOperation,
) -> Result<(), String> { ) -> Result<(), String> {
self.proto_array self.proto_array
.propagate_execution_payload_invalidation(op) .propagate_execution_payload_invalidation::<E>(op)
.map_err(|e| format!("Failed to process invalid payload: {:?}", e)) .map_err(|e| format!("Failed to process invalid payload: {:?}", e))
} }
@ -748,6 +748,15 @@ impl ProtoArrayForkChoice {
.is_descendant(ancestor_root, descendant_root) .is_descendant(ancestor_root, descendant_root)
} }
/// See `ProtoArray` documentation.
pub fn is_finalized_checkpoint_or_descendant<E: EthSpec>(
&self,
descendant_root: Hash256,
) -> bool {
self.proto_array
.is_finalized_checkpoint_or_descendant::<E>(descendant_root)
}
pub fn latest_message(&self, validator_index: usize) -> Option<(Hash256, Epoch)> { pub fn latest_message(&self, validator_index: usize) -> Option<(Hash256, Epoch)> {
if validator_index < self.votes.0.len() { if validator_index < self.votes.0.len() {
let vote = &self.votes.0[validator_index]; let vote = &self.votes.0[validator_index];
@ -928,6 +937,10 @@ mod test_compute_deltas {
epoch: genesis_epoch, epoch: genesis_epoch,
root: finalized_root, root: finalized_root,
}; };
let junk_checkpoint = Checkpoint {
epoch: Epoch::new(42),
root: Hash256::repeat_byte(42),
};
let mut fc = ProtoArrayForkChoice::new::<MainnetEthSpec>( let mut fc = ProtoArrayForkChoice::new::<MainnetEthSpec>(
genesis_slot, genesis_slot,
@ -973,8 +986,10 @@ mod test_compute_deltas {
target_root: finalized_root, target_root: finalized_root,
current_epoch_shuffling_id: junk_shuffling_id.clone(), current_epoch_shuffling_id: junk_shuffling_id.clone(),
next_epoch_shuffling_id: junk_shuffling_id, next_epoch_shuffling_id: junk_shuffling_id,
justified_checkpoint: genesis_checkpoint, // Use the junk checkpoint for the next to values to prevent
finalized_checkpoint: genesis_checkpoint, // the loop-shortcutting mechanism from triggering.
justified_checkpoint: junk_checkpoint,
finalized_checkpoint: junk_checkpoint,
execution_status, execution_status,
unrealized_justified_checkpoint: None, unrealized_justified_checkpoint: None,
unrealized_finalized_checkpoint: None, unrealized_finalized_checkpoint: None,
@ -993,6 +1008,11 @@ mod test_compute_deltas {
assert!(!fc.is_descendant(finalized_root, not_finalized_desc)); assert!(!fc.is_descendant(finalized_root, not_finalized_desc));
assert!(!fc.is_descendant(finalized_root, unknown)); assert!(!fc.is_descendant(finalized_root, unknown));
assert!(fc.is_finalized_checkpoint_or_descendant::<MainnetEthSpec>(finalized_root));
assert!(fc.is_finalized_checkpoint_or_descendant::<MainnetEthSpec>(finalized_desc));
assert!(!fc.is_finalized_checkpoint_or_descendant::<MainnetEthSpec>(not_finalized_desc));
assert!(!fc.is_finalized_checkpoint_or_descendant::<MainnetEthSpec>(unknown));
assert!(!fc.is_descendant(finalized_desc, not_finalized_desc)); assert!(!fc.is_descendant(finalized_desc, not_finalized_desc));
assert!(fc.is_descendant(finalized_desc, finalized_desc)); assert!(fc.is_descendant(finalized_desc, finalized_desc));
assert!(!fc.is_descendant(finalized_desc, finalized_root)); assert!(!fc.is_descendant(finalized_desc, finalized_root));
@ -1004,6 +1024,171 @@ mod test_compute_deltas {
assert!(!fc.is_descendant(not_finalized_desc, unknown)); assert!(!fc.is_descendant(not_finalized_desc, unknown));
} }
/// This test covers an interesting case where a block can be a descendant
/// of the finalized *block*, but not a descenant of the finalized
/// *checkpoint*.
///
/// ## Example
///
/// Consider this block tree which has three blocks (`A`, `B` and `C`):
///
/// ```ignore
/// [A] <--- [-] <--- [B]
/// |
/// |--[C]
/// ```
///
/// - `A` (slot 31) is the common descendant.
/// - `B` (slot 33) descends from `A`, but there is a single skip slot
/// between it and `A`.
/// - `C` (slot 32) descends from `A` and conflicts with `B`.
///
/// Imagine that the `B` chain is finalized at epoch 1. This means that the
/// finalized checkpoint points to the skipped slot at 32. The root of the
/// finalized checkpoint is `A`.
///
/// In this scenario, the block `C` has the finalized root (`A`) as an
/// ancestor whilst simultaneously conflicting with the finalized
/// checkpoint.
///
/// This means that to ensure a block does not conflict with finality we
/// must check to ensure that it's an ancestor of the finalized
/// *checkpoint*, not just the finalized *block*.
#[test]
fn finalized_descendant_edge_case() {
let get_block_root = Hash256::from_low_u64_be;
let genesis_slot = Slot::new(0);
let junk_state_root = Hash256::zero();
let junk_shuffling_id =
AttestationShufflingId::from_components(Epoch::new(0), Hash256::zero());
let execution_status = ExecutionStatus::irrelevant();
let genesis_checkpoint = Checkpoint {
epoch: Epoch::new(0),
root: get_block_root(0),
};
let mut fc = ProtoArrayForkChoice::new::<MainnetEthSpec>(
genesis_slot,
junk_state_root,
genesis_checkpoint,
genesis_checkpoint,
junk_shuffling_id.clone(),
junk_shuffling_id.clone(),
execution_status,
CountUnrealizedFull::default(),
)
.unwrap();
struct TestBlock {
slot: u64,
root: u64,
parent_root: u64,
}
let insert_block = |fc: &mut ProtoArrayForkChoice, block: TestBlock| {
fc.proto_array
.on_block::<MainnetEthSpec>(
Block {
slot: Slot::from(block.slot),
root: get_block_root(block.root),
parent_root: Some(get_block_root(block.parent_root)),
state_root: Hash256::zero(),
target_root: Hash256::zero(),
current_epoch_shuffling_id: junk_shuffling_id.clone(),
next_epoch_shuffling_id: junk_shuffling_id.clone(),
justified_checkpoint: Checkpoint {
epoch: Epoch::new(0),
root: get_block_root(0),
},
finalized_checkpoint: genesis_checkpoint,
execution_status,
unrealized_justified_checkpoint: Some(genesis_checkpoint),
unrealized_finalized_checkpoint: Some(genesis_checkpoint),
},
Slot::from(block.slot),
)
.unwrap();
};
/*
* Start of interesting part of tests.
*/
// Produce the 0th epoch of blocks. They should all form a chain from
// the genesis block.
for i in 1..MainnetEthSpec::slots_per_epoch() {
insert_block(
&mut fc,
TestBlock {
slot: i,
root: i,
parent_root: i - 1,
},
)
}
let last_slot_of_epoch_0 = MainnetEthSpec::slots_per_epoch() - 1;
// Produce a block that descends from the last block of epoch -.
//
// This block will be non-canonical.
let non_canonical_slot = last_slot_of_epoch_0 + 1;
insert_block(
&mut fc,
TestBlock {
slot: non_canonical_slot,
root: non_canonical_slot,
parent_root: non_canonical_slot - 1,
},
);
// Produce a block that descends from the last block of the 0th epoch,
// that skips the 1st slot of the 1st epoch.
//
// This block will be canonical.
let canonical_slot = last_slot_of_epoch_0 + 2;
insert_block(
&mut fc,
TestBlock {
slot: canonical_slot,
root: canonical_slot,
parent_root: non_canonical_slot - 1,
},
);
let finalized_root = get_block_root(last_slot_of_epoch_0);
// Set the finalized checkpoint to finalize the first slot of epoch 1 on
// the canonical chain.
fc.proto_array.finalized_checkpoint = Checkpoint {
root: finalized_root,
epoch: Epoch::new(1),
};
assert!(
fc.proto_array
.is_finalized_checkpoint_or_descendant::<MainnetEthSpec>(finalized_root),
"the finalized checkpoint is the finalized checkpoint"
);
assert!(
fc.proto_array
.is_finalized_checkpoint_or_descendant::<MainnetEthSpec>(get_block_root(
canonical_slot
)),
"the canonical block is a descendant of the finalized checkpoint"
);
assert!(
!fc.proto_array
.is_finalized_checkpoint_or_descendant::<MainnetEthSpec>(get_block_root(
non_canonical_slot
)),
"although the non-canonical block is a descendant of the finalized block, \
it's not a descendant of the finalized checkpoint"
);
}
#[test] #[test]
fn zero_hash() { fn zero_hash() {
let validator_count: usize = 16; let validator_count: usize = 16;

View File

@ -12,4 +12,4 @@ pub mod u64_hex_be;
pub mod u8_hex; pub mod u8_hex;
pub use fixed_bytes_hex::{bytes_4_hex, bytes_8_hex}; pub use fixed_bytes_hex::{bytes_4_hex, bytes_8_hex};
pub use quoted_int::{quoted_u256, quoted_u32, quoted_u64, quoted_u8}; pub use quoted_int::{quoted_i64, quoted_u256, quoted_u32, quoted_u64, quoted_u8};

View File

@ -11,7 +11,7 @@ use std::convert::TryFrom;
use std::marker::PhantomData; use std::marker::PhantomData;
macro_rules! define_mod { macro_rules! define_mod {
($int: ty, $visit_fn: ident) => { ($int: ty) => {
/// Serde support for deserializing quoted integers. /// Serde support for deserializing quoted integers.
/// ///
/// Configurable so that quotes are either required or optional. /// Configurable so that quotes are either required or optional.
@ -140,19 +140,25 @@ macro_rules! define_mod {
pub mod quoted_u8 { pub mod quoted_u8 {
use super::*; use super::*;
define_mod!(u8, visit_u8); define_mod!(u8);
} }
pub mod quoted_u32 { pub mod quoted_u32 {
use super::*; use super::*;
define_mod!(u32, visit_u32); define_mod!(u32);
} }
pub mod quoted_u64 { pub mod quoted_u64 {
use super::*; use super::*;
define_mod!(u64, visit_u64); define_mod!(u64);
}
pub mod quoted_i64 {
use super::*;
define_mod!(i64);
} }
pub mod quoted_u256 { pub mod quoted_u256 {
@ -216,4 +222,26 @@ mod test {
fn u256_without_quotes() { fn u256_without_quotes() {
serde_json::from_str::<WrappedU256>("1").unwrap_err(); serde_json::from_str::<WrappedU256>("1").unwrap_err();
} }
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(transparent)]
struct WrappedI64(#[serde(with = "quoted_i64")] i64);
#[test]
fn negative_i64_with_quotes() {
assert_eq!(
serde_json::from_str::<WrappedI64>("\"-200\"").unwrap().0,
-200
);
assert_eq!(
serde_json::to_string(&WrappedI64(-12_500)).unwrap(),
"\"-12500\""
);
}
// It would be OK if this worked, but we don't need it to (i64s should always be quoted).
#[test]
fn negative_i64_without_quotes() {
serde_json::from_str::<WrappedI64>("-200").unwrap_err();
}
} }

View File

@ -1079,6 +1079,19 @@ fn http_port_flag() {
.with_config(|config| assert_eq!(config.http_api.listen_port, port1)); .with_config(|config| assert_eq!(config.http_api.listen_port, port1));
} }
#[test] #[test]
fn empty_self_limiter_flag() {
// Test that empty rate limiter is accepted using the default rate limiting configurations.
CommandLineTest::new()
.flag("self-limiter", None)
.run_with_zero_port()
.with_config(|config| {
assert_eq!(
config.network.outbound_rate_limiter_config,
Some(lighthouse_network::rpc::config::OutboundRateLimiterConfig::default())
)
});
}
#[test]
fn http_allow_origin_flag() { fn http_allow_origin_flag() {
CommandLineTest::new() CommandLineTest::new()
.flag("http-allow-origin", Some("127.0.0.99")) .flag("http-allow-origin", Some("127.0.0.99"))

View File

@ -231,6 +231,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
address of this server (e.g., http://localhost:5064).") address of this server (e.g., http://localhost:5064).")
.takes_value(true), .takes_value(true),
) )
.arg(
Arg::with_name("enable-high-validator-count-metrics")
.long("enable-high-validator-count-metrics")
.help("Enable per validator metrics for > 64 validators. \
Note: This flag is automatically enabled for <= 64 validators. \
Enabling this metric for higher validator counts will lead to higher volume \
of prometheus metrics being collected.")
.takes_value(false),
)
/* /*
* Explorer metrics * Explorer metrics
*/ */

View File

@ -53,6 +53,11 @@ pub struct Config {
/// If true, enable functionality that monitors the network for attestations or proposals from /// If true, enable functionality that monitors the network for attestations or proposals from
/// any of the validators managed by this client before starting up. /// any of the validators managed by this client before starting up.
pub enable_doppelganger_protection: bool, pub enable_doppelganger_protection: bool,
/// If true, then we publish validator specific metrics (e.g next attestation duty slot)
/// for all our managed validators.
/// Note: We publish validator specific metrics for low validator counts without this flag
/// (<= 64 validators)
pub enable_high_validator_count_metrics: bool,
/// Enable use of the blinded block endpoints during proposals. /// Enable use of the blinded block endpoints during proposals.
pub builder_proposals: bool, pub builder_proposals: bool,
/// Overrides the timestamp field in builder api ValidatorRegistrationV1 /// Overrides the timestamp field in builder api ValidatorRegistrationV1
@ -99,6 +104,7 @@ impl Default for Config {
http_metrics: <_>::default(), http_metrics: <_>::default(),
monitoring_api: None, monitoring_api: None,
enable_doppelganger_protection: false, enable_doppelganger_protection: false,
enable_high_validator_count_metrics: false,
beacon_nodes_tls_certs: None, beacon_nodes_tls_certs: None,
block_delay: None, block_delay: None,
builder_proposals: false, builder_proposals: false,
@ -273,6 +279,10 @@ impl Config {
config.http_metrics.enabled = true; config.http_metrics.enabled = true;
} }
if cli_args.is_present("enable-high-validator-count-metrics") {
config.enable_high_validator_count_metrics = true;
}
if let Some(address) = cli_args.value_of("metrics-address") { if let Some(address) = cli_args.value_of("metrics-address") {
config.http_metrics.listen_addr = address config.http_metrics.listen_addr = address
.parse::<IpAddr>() .parse::<IpAddr>()

View File

@ -9,6 +9,7 @@
mod sync; mod sync;
use crate::beacon_node_fallback::{BeaconNodeFallback, OfflineOnFailure, RequireSynced}; use crate::beacon_node_fallback::{BeaconNodeFallback, OfflineOnFailure, RequireSynced};
use crate::http_metrics::metrics::{get_int_gauge, set_int_gauge, ATTESTATION_DUTY};
use crate::{ use crate::{
block_service::BlockServiceNotification, block_service::BlockServiceNotification,
http_metrics::metrics, http_metrics::metrics,
@ -39,6 +40,11 @@ const SUBSCRIPTION_BUFFER_SLOTS: u64 = 2;
/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch. /// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
const HISTORICAL_DUTIES_EPOCHS: u64 = 2; const HISTORICAL_DUTIES_EPOCHS: u64 = 2;
/// Minimum number of validators for which we auto-enable per-validator metrics.
/// For validators greater than this value, we need to manually set the `enable-per-validator-metrics`
/// flag in the cli to enable collection of per validator metrics.
const VALIDATOR_METRICS_MIN_COUNT: usize = 64;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
UnableToReadSlotClock, UnableToReadSlotClock,
@ -121,6 +127,7 @@ pub struct DutiesService<T, E: EthSpec> {
/// This functionality is a little redundant since most BNs will likely reject duties when they /// This functionality is a little redundant since most BNs will likely reject duties when they
/// aren't synced, but we keep it around for an emergency. /// aren't synced, but we keep it around for an emergency.
pub require_synced: RequireSynced, pub require_synced: RequireSynced,
pub enable_high_validator_count_metrics: bool,
pub context: RuntimeContext<E>, pub context: RuntimeContext<E>,
pub spec: ChainSpec, pub spec: ChainSpec,
} }
@ -220,6 +227,12 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
.cloned() .cloned()
.collect() .collect()
} }
/// Returns `true` if we should collect per validator metrics and `false` otherwise.
pub fn per_validator_metrics(&self) -> bool {
self.enable_high_validator_count_metrics
|| self.total_validator_count() <= VALIDATOR_METRICS_MIN_COUNT
}
} }
/// Start the service that periodically polls the beacon node for validator duties. This will start /// Start the service that periodically polls the beacon node for validator duties. This will start
@ -501,6 +514,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
current_epoch, current_epoch,
&local_indices, &local_indices,
&local_pubkeys, &local_pubkeys,
current_slot,
) )
.await .await
{ {
@ -520,8 +534,13 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
); );
// Download the duties and update the duties for the next epoch. // Download the duties and update the duties for the next epoch.
if let Err(e) = if let Err(e) = poll_beacon_attesters_for_epoch(
poll_beacon_attesters_for_epoch(duties_service, next_epoch, &local_indices, &local_pubkeys) duties_service,
next_epoch,
&local_indices,
&local_pubkeys,
current_slot,
)
.await .await
{ {
error!( error!(
@ -619,6 +638,7 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
epoch: Epoch, epoch: Epoch,
local_indices: &[u64], local_indices: &[u64],
local_pubkeys: &HashSet<PublicKeyBytes>, local_pubkeys: &HashSet<PublicKeyBytes>,
current_slot: Slot,
) -> Result<(), Error> { ) -> Result<(), Error> {
let log = duties_service.context.log(); let log = duties_service.context.log();
@ -671,6 +691,35 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
.data .data
.into_iter() .into_iter()
.filter(|duty| { .filter(|duty| {
if duties_service.per_validator_metrics() {
let validator_index = duty.validator_index;
let duty_slot = duty.slot;
if let Some(existing_slot_gauge) =
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
{
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());
// First condition ensures that we switch to the next epoch duty slot
// once the current epoch duty slot passes.
// Second condition is to ensure that next epoch duties don't override
// current epoch duties.
if existing_slot < current_slot
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
&& duty_slot > current_slot
&& duty_slot != existing_slot)
{
existing_slot_gauge.set(duty_slot.as_u64() as i64);
}
} else {
set_int_gauge(
&ATTESTATION_DUTY,
&[&validator_index.to_string()],
duty_slot.as_u64() as i64,
);
}
}
local_pubkeys.contains(&duty.pubkey) && { local_pubkeys.contains(&duty.pubkey) && {
// Only update the duties if either is true: // Only update the duties if either is true:
// //

View File

@ -177,6 +177,12 @@ lazy_static::lazy_static! {
"Duration to obtain a signature", "Duration to obtain a signature",
&["type"] &["type"]
); );
pub static ref ATTESTATION_DUTY: Result<IntGaugeVec> = try_create_int_gauge_vec(
"vc_attestation_duty_slot",
"Attestation duty slot for all managed validators",
&["validator"]
);
} }
pub fn gather_prometheus_metrics<T: EthSpec>( pub fn gather_prometheus_metrics<T: EthSpec>(

View File

@ -422,6 +422,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
}, },
spec: context.eth2_config.spec.clone(), spec: context.eth2_config.spec.clone(),
context: duties_context, context: duties_context,
enable_high_validator_count_metrics: config.enable_high_validator_count_metrics,
}); });
// Update the metrics server. // Update the metrics server.