Merge pull request #3921 from realbigsean/cap-4844

Eip4844 capella rebase
This commit is contained in:
realbigsean 2023-01-27 11:51:11 +01:00 committed by GitHub
commit ee25c21463
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
60 changed files with 1779 additions and 225 deletions

4
.cargo/config.toml Normal file
View File

@ -0,0 +1,4 @@
[env]
# Set the number of arenas to 16 when using jemalloc.
JEMALLOC_SYS_WITH_MALLOC_CONF = "abort_conf:true,narenas:16"

View File

@ -112,6 +112,7 @@ jobs:
--platform=linux/${SHORT_ARCH} \
--file ./Dockerfile.cross . \
--tag ${IMAGE_NAME}:${VERSION}-${SHORT_ARCH}${VERSION_SUFFIX}${MODERNITY_SUFFIX} \
--provenance=false \
--push
build-docker-multiarch:
name: build-docker-multiarch${{ matrix.modernity }}

View File

@ -306,16 +306,6 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Typecheck benchmark code without running it
run: make check-benches
check-consensus:
name: check-consensus
runs-on: ubuntu-latest
needs: cargo-fmt
steps:
- uses: actions/checkout@v3
- name: Get latest version of stable Rust
run: rustup update stable
- name: Typecheck consensus code in strict mode
run: make check-consensus
clippy:
name: clippy
runs-on: ubuntu-latest
@ -382,14 +372,12 @@ jobs:
- uses: actions/checkout@v3
- name: Install Rust (${{ env.PINNED_NIGHTLY }})
run: rustup toolchain install $PINNED_NIGHTLY
# NOTE: cargo-udeps version is pinned until this issue is resolved:
# https://github.com/est31/cargo-udeps/issues/135
- name: Install Protoc
uses: arduino/setup-protoc@e52d9eb8f7b63115df1ac544a1376fdbf5a39612
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install cargo-udeps
run: cargo install cargo-udeps --locked --force --version 0.1.30
run: cargo install cargo-udeps --locked --force
- name: Create Cargo config dir
run: mkdir -p .cargo
- name: Install custom Cargo config

42
Cargo.lock generated
View File

@ -2710,6 +2710,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "fs_extra"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
[[package]]
name = "funty"
version = "1.1.0"
@ -3223,6 +3229,7 @@ dependencies = [
"eth2_ssz",
"execution_layer",
"futures",
"genesis",
"hex",
"lazy_static",
"lighthouse_metrics",
@ -3610,6 +3617,38 @@ version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440"
[[package]]
name = "jemalloc-ctl"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1891c671f3db85d8ea8525dd43ab147f9977041911d24a03e5a36187a7bfde9"
dependencies = [
"jemalloc-sys",
"libc",
"paste",
]
[[package]]
name = "jemalloc-sys"
version = "0.5.2+5.3.0-patched"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "134163979b6eed9564c98637b710b40979939ba351f59952708234ea11b5f3f8"
dependencies = [
"cc",
"fs_extra",
"libc",
]
[[package]]
name = "jemallocator"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16c2514137880c52b0b4822b563fadd38257c1f380858addb74a400889696ea6"
dependencies = [
"jemalloc-sys",
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.60"
@ -3739,6 +3778,7 @@ dependencies = [
"lighthouse_network",
"lighthouse_version",
"log",
"malloc_utils",
"sensitive_url",
"serde",
"serde_json",
@ -4548,6 +4588,8 @@ dependencies = [
name = "malloc_utils"
version = "0.1.0"
dependencies = [
"jemalloc-ctl",
"jemallocator",
"lazy_static",
"libc",
"lighthouse_metrics",

View File

@ -90,6 +90,7 @@ members = [
"validator_client",
"validator_client/slashing_protection",
]
resolver = "2"
[patch]
[patch.crates-io]

View File

@ -14,8 +14,16 @@ BUILD_PATH_AARCH64 = "target/$(AARCH64_TAG)/release"
PINNED_NIGHTLY ?= nightly
CLIPPY_PINNED_NIGHTLY=nightly-2022-05-19
# List of features to use when building natively. Can be overriden via the environment.
# No jemalloc on Windows
ifeq ($(OS),Windows_NT)
FEATURES?=
else
FEATURES?=jemalloc
endif
# List of features to use when cross-compiling. Can be overridden via the environment.
CROSS_FEATURES ?= gnosis,slasher-lmdb,slasher-mdbx
CROSS_FEATURES ?= gnosis,slasher-lmdb,slasher-mdbx,jemalloc
# Cargo profile for Cross builds. Default is for local builds, CI uses an override.
CROSS_PROFILE ?= release
@ -104,10 +112,6 @@ cargo-fmt:
check-benches:
cargo check --workspace --benches
# Typechecks consensus code *without* allowing deprecated legacy arithmetic or metrics.
check-consensus:
cargo check -p state_processing --no-default-features
# Runs only the ef-test vectors.
run-ef-tests:
rm -rf $(EF_TESTS)/.accessed_file_log.txt

View File

@ -66,7 +66,7 @@ of the Lighthouse book.
The best place for discussion is the [Lighthouse Discord
server](https://discord.gg/cyAszAh).
Sign up to the [Lighthouse Development Updates](https://eepurl.com/dh9Lvb/) mailing list for email
Sign up to the [Lighthouse Development Updates](https://eepurl.com/dh9Lvb) mailing list for email
notifications about releases, network status and other important information.
Encrypt sensitive messages using our [PGP

View File

@ -2313,32 +2313,74 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
/// Verify a signed BLS to execution change before allowing it to propagate on the gossip network.
pub fn verify_bls_to_execution_change_for_gossip(
pub fn verify_bls_to_execution_change_for_http_api(
&self,
bls_to_execution_change: SignedBlsToExecutionChange,
) -> Result<ObservationOutcome<SignedBlsToExecutionChange, T::EthSpec>, Error> {
let current_fork = self.spec.fork_name_at_slot::<T::EthSpec>(self.slot()?);
if let ForkName::Base | ForkName::Altair | ForkName::Merge = current_fork {
// Disallow BLS to execution changes prior to the Capella fork.
return Err(Error::BlsToExecutionChangeBadFork(current_fork));
// Before checking the gossip duplicate filter, check that no prior change is already
// in our op pool. Ignore these messages: do not gossip, do not try to override the pool.
match self
.op_pool
.bls_to_execution_change_in_pool_equals(&bls_to_execution_change)
{
Some(true) => return Ok(ObservationOutcome::AlreadyKnown),
Some(false) => return Err(Error::BlsToExecutionConflictsWithPool),
None => (),
}
let wall_clock_state = self.wall_clock_state()?;
// Use the head state to save advancing to the wall-clock slot unnecessarily. The message is
// signed with respect to the genesis fork version, and the slot check for gossip is applied
// separately. This `Arc` clone of the head is nice and cheap.
let head_snapshot = self.head().snapshot;
let head_state = &head_snapshot.beacon_state;
Ok(self
.observed_bls_to_execution_changes
.lock()
.verify_and_observe(bls_to_execution_change, &wall_clock_state, &self.spec)?)
.verify_and_observe(bls_to_execution_change, head_state, &self.spec)?)
}
/// Verify a signed BLS to execution change before allowing it to propagate on the gossip network.
pub fn verify_bls_to_execution_change_for_gossip(
&self,
bls_to_execution_change: SignedBlsToExecutionChange,
) -> Result<ObservationOutcome<SignedBlsToExecutionChange, T::EthSpec>, Error> {
// Ignore BLS to execution changes on gossip prior to Capella.
if !self.current_slot_is_post_capella()? {
return Err(Error::BlsToExecutionPriorToCapella);
}
self.verify_bls_to_execution_change_for_http_api(bls_to_execution_change)
.or_else(|e| {
// On gossip treat conflicts the same as duplicates [IGNORE].
match e {
Error::BlsToExecutionConflictsWithPool => Ok(ObservationOutcome::AlreadyKnown),
e => Err(e),
}
})
}
/// Check if the current slot is greater than or equal to the Capella fork epoch.
pub fn current_slot_is_post_capella(&self) -> Result<bool, Error> {
let current_fork = self.spec.fork_name_at_slot::<T::EthSpec>(self.slot()?);
if let ForkName::Base | ForkName::Altair | ForkName::Merge = current_fork {
Ok(false)
} else {
Ok(true)
}
}
/// Import a BLS to execution change to the op pool.
///
/// Return `true` if the change was added to the pool.
pub fn import_bls_to_execution_change(
&self,
bls_to_execution_change: SigVerifiedOp<SignedBlsToExecutionChange, T::EthSpec>,
) {
) -> bool {
if self.eth1_chain.is_some() {
self.op_pool
.insert_bls_to_execution_change(bls_to_execution_change);
.insert_bls_to_execution_change(bls_to_execution_change)
} else {
false
}
}

View File

@ -162,6 +162,7 @@ pub enum BeaconChainError {
BlockRewardSlotError,
BlockRewardAttestationError,
BlockRewardSyncError,
SyncCommitteeRewardsSyncError,
HeadMissingFromForkChoice(Hash256),
FinalizedBlockMissingFromForkChoice(Hash256),
HeadBlockMissingFromForkChoice(Hash256),
@ -206,7 +207,8 @@ pub enum BeaconChainError {
MissingPersistedForkChoice,
CommitteePromiseFailed(oneshot_broadcast::Error),
MaxCommitteePromises(usize),
BlsToExecutionChangeBadFork(ForkName),
BlsToExecutionPriorToCapella,
BlsToExecutionConflictsWithPool,
InconsistentFork(InconsistentFork),
ProposerHeadForkChoiceError(fork_choice::Error<proto_array::Error>),
BlobsUnavailable,

View File

@ -43,6 +43,7 @@ pub mod schema_change;
mod shuffling_cache;
mod snapshot_cache;
pub mod state_advance_timer;
pub mod sync_committee_rewards;
pub mod sync_committee_verification;
pub mod test_utils;
mod timeout_rw_lock;

View File

@ -2,6 +2,7 @@ use crate::{
beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use derivative::Derivative;
use eth2::types::Hash256;
use slot_clock::SlotClock;
use std::time::Duration;
use strum::AsRefStr;
@ -36,6 +37,8 @@ pub enum Error {
SigSlotStartIsNone,
/// Failed to construct a LightClientOptimisticUpdate from state.
FailedConstructingUpdate,
/// Unknown block with parent root.
UnknownBlockParentRoot(Hash256),
/// Beacon chain error occured.
BeaconChainError(BeaconChainError),
LightClientUpdateError(LightClientUpdateError),
@ -58,6 +61,7 @@ impl From<LightClientUpdateError> for Error {
#[derivative(Clone(bound = "T: BeaconChainTypes"))]
pub struct VerifiedLightClientOptimisticUpdate<T: BeaconChainTypes> {
light_client_optimistic_update: LightClientOptimisticUpdate<T::EthSpec>,
pub parent_root: Hash256,
seen_timestamp: Duration,
}
@ -107,6 +111,16 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
None => return Err(Error::SigSlotStartIsNone),
}
// check if we can process the optimistic update immediately
// otherwise queue
let canonical_root = light_client_optimistic_update
.attested_header
.canonical_root();
if canonical_root != head_block.message().parent_root() {
return Err(Error::UnknownBlockParentRoot(canonical_root));
}
let optimistic_update =
LightClientOptimisticUpdate::new(&chain.spec, head_block, &attested_state)?;
@ -119,6 +133,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
Ok(Self {
light_client_optimistic_update,
parent_root: canonical_root,
seen_timestamp,
})
}

View File

@ -0,0 +1,87 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::lighthouse::SyncCommitteeReward;
use safe_arith::SafeArith;
use slog::error;
use state_processing::per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards;
use std::collections::HashMap;
use store::RelativeEpoch;
use types::{AbstractExecPayload, BeaconBlockRef, BeaconState};
impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn compute_sync_committee_rewards<Payload: AbstractExecPayload<T::EthSpec>>(
&self,
block: BeaconBlockRef<'_, T::EthSpec, Payload>,
state: &mut BeaconState<T::EthSpec>,
) -> Result<Vec<SyncCommitteeReward>, BeaconChainError> {
if block.slot() != state.slot() {
return Err(BeaconChainError::BlockRewardSlotError);
}
let spec = &self.spec;
state.build_committee_cache(RelativeEpoch::Current, spec)?;
let sync_aggregate = block.body().sync_aggregate()?;
let sync_committee = state.current_sync_committee()?.clone();
let sync_committee_indices = state.get_sync_committee_indices(&sync_committee)?;
let (participant_reward_value, proposer_reward_per_bit) =
compute_sync_aggregate_rewards(state, spec).map_err(|e| {
error!(
self.log, "Error calculating sync aggregate rewards";
"error" => ?e
);
BeaconChainError::SyncCommitteeRewardsSyncError
})?;
let mut balances = HashMap::<usize, u64>::new();
let mut total_proposer_rewards = 0;
let proposer_index = state.get_beacon_proposer_index(block.slot(), spec)?;
// Apply rewards to participant balances. Keep track of proposer rewards
for (validator_index, participant_bit) in sync_committee_indices
.iter()
.zip(sync_aggregate.sync_committee_bits.iter())
{
let participant_balance = balances
.entry(*validator_index)
.or_insert_with(|| state.balances()[*validator_index]);
if participant_bit {
participant_balance.safe_add_assign(participant_reward_value)?;
balances
.entry(proposer_index)
.or_insert_with(|| state.balances()[proposer_index])
.safe_add_assign(proposer_reward_per_bit)?;
total_proposer_rewards.safe_add_assign(proposer_reward_per_bit)?;
} else {
*participant_balance = participant_balance.saturating_sub(participant_reward_value);
}
}
Ok(balances
.iter()
.filter_map(|(i, new_balance)| {
let reward = if *i != proposer_index {
*new_balance as i64 - state.balances()[*i] as i64
} else if sync_committee_indices.contains(i) {
*new_balance as i64
- state.balances()[*i] as i64
- total_proposer_rewards as i64
} else {
return None;
};
Some(SyncCommitteeReward {
validator_index: *i as u64,
reward,
})
})
.collect())
}
}

View File

@ -2,6 +2,7 @@ pub use crate::persisted_beacon_chain::PersistedBeaconChain;
pub use crate::{
beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY},
migrate::MigratorConfig,
sync_committee_verification::Error as SyncCommitteeError,
validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD,
BeaconChainError, NotifyExecutionLayer, ProduceBlockVerification,
};
@ -22,7 +23,7 @@ use execution_layer::{
};
use fork_choice::CountUnrealized;
use futures::channel::mpsc::Receiver;
pub use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH};
pub use genesis::{interop_genesis_state_with_eth1, DEFAULT_ETH1_BLOCK_HASH};
use int_to_bytes::int_to_bytes32;
use kzg::TrustedSetup;
use merkle_proof::MerkleTree;
@ -149,6 +150,7 @@ pub struct Builder<T: BeaconChainTypes> {
eth_spec_instance: T::EthSpec,
spec: Option<ChainSpec>,
validator_keypairs: Option<Vec<Keypair>>,
withdrawal_keypairs: Vec<Option<Keypair>>,
chain_config: Option<ChainConfig>,
store_config: Option<StoreConfig>,
#[allow(clippy::type_complexity)]
@ -180,7 +182,7 @@ impl<E: EthSpec> Builder<EphemeralHarnessType<E>> {
.unwrap(),
);
let mutator = move |builder: BeaconChainBuilder<_>| {
let genesis_state = interop_genesis_state::<E>(
let genesis_state = interop_genesis_state_with_eth1::<E>(
&validator_keypairs,
HARNESS_GENESIS_TIME,
Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH),
@ -241,7 +243,7 @@ impl<E: EthSpec> Builder<DiskHarnessType<E>> {
.expect("cannot build without validator keypairs");
let mutator = move |builder: BeaconChainBuilder<_>| {
let genesis_state = interop_genesis_state::<E>(
let genesis_state = interop_genesis_state_with_eth1::<E>(
&validator_keypairs,
HARNESS_GENESIS_TIME,
Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH),
@ -283,6 +285,7 @@ where
eth_spec_instance,
spec: None,
validator_keypairs: None,
withdrawal_keypairs: vec![],
chain_config: None,
store_config: None,
store: None,
@ -308,6 +311,11 @@ where
self
}
pub fn withdrawal_keypairs(mut self, withdrawal_keypairs: Vec<Option<Keypair>>) -> Self {
self.withdrawal_keypairs = withdrawal_keypairs;
self
}
pub fn default_spec(self) -> Self {
self.spec_or_default(None)
}
@ -545,6 +553,7 @@ where
spec: chain.spec.clone(),
chain: Arc::new(chain),
validator_keypairs,
withdrawal_keypairs: self.withdrawal_keypairs,
shutdown_receiver: Arc::new(Mutex::new(shutdown_receiver)),
runtime: self.runtime,
mock_execution_layer: self.mock_execution_layer,
@ -560,6 +569,12 @@ where
/// Used for testing.
pub struct BeaconChainHarness<T: BeaconChainTypes> {
pub validator_keypairs: Vec<Keypair>,
/// Optional BLS withdrawal keys for each validator.
///
/// If a validator index is missing from this vec or their entry is `None` then either
/// no BLS withdrawal key was set for them (they had an address from genesis) or the test
/// initializer neglected to set this field.
pub withdrawal_keypairs: Vec<Option<Keypair>>,
pub chain: Arc<BeaconChain<T>>,
pub spec: ChainSpec,
@ -1471,6 +1486,44 @@ where
.sign(sk, &fork, genesis_validators_root, &self.chain.spec)
}
pub fn make_bls_to_execution_change(
&self,
validator_index: u64,
address: Address,
) -> SignedBlsToExecutionChange {
let keypair = self.get_withdrawal_keypair(validator_index);
self.make_bls_to_execution_change_with_keys(
validator_index,
address,
&keypair.pk,
&keypair.sk,
)
}
pub fn make_bls_to_execution_change_with_keys(
&self,
validator_index: u64,
address: Address,
pubkey: &PublicKey,
secret_key: &SecretKey,
) -> SignedBlsToExecutionChange {
let genesis_validators_root = self.chain.genesis_validators_root;
BlsToExecutionChange {
validator_index,
from_bls_pubkey: pubkey.compress(),
to_execution_address: address,
}
.sign(secret_key, genesis_validators_root, &self.chain.spec)
}
pub fn get_withdrawal_keypair(&self, validator_index: u64) -> &Keypair {
self.withdrawal_keypairs
.get(validator_index as usize)
.expect("BLS withdrawal key missing from harness")
.as_ref()
.expect("no withdrawal key for validator")
}
pub fn add_voluntary_exit(
&self,
block: &mut BeaconBlock<E>,
@ -2021,6 +2074,30 @@ where
(honest_head, faulty_head)
}
pub fn process_sync_contributions(
&self,
sync_contributions: HarnessSyncContributions<E>,
) -> Result<(), SyncCommitteeError> {
let mut verified_contributions = Vec::with_capacity(sync_contributions.len());
for (_, contribution_and_proof) in sync_contributions {
let signed_contribution_and_proof = contribution_and_proof.unwrap();
let verified_contribution = self
.chain
.verify_sync_contribution_for_gossip(signed_contribution_and_proof)?;
verified_contributions.push(verified_contribution);
}
for verified_contribution in verified_contributions {
self.chain
.add_contribution_to_block_inclusion_pool(verified_contribution)?;
}
Ok(())
}
}
// Junk `Debug` impl to satistfy certain trait bounds during testing.

View File

@ -5,6 +5,7 @@ mod capella;
mod merge;
mod op_verification;
mod payload_invalidation;
mod rewards;
mod store_tests;
mod sync_committee_verification;
mod tests;

View File

@ -0,0 +1,121 @@
#![cfg(test)]
use std::collections::HashMap;
use beacon_chain::test_utils::{
generate_deterministic_keypairs, BeaconChainHarness, EphemeralHarnessType,
};
use beacon_chain::{
test_utils::{AttestationStrategy, BlockStrategy, RelativeSyncCommittee},
types::{Epoch, EthSpec, Keypair, MinimalEthSpec},
};
use lazy_static::lazy_static;
pub const VALIDATOR_COUNT: usize = 64;
lazy_static! {
static ref KEYPAIRS: Vec<Keypair> = generate_deterministic_keypairs(VALIDATOR_COUNT);
}
fn get_harness<E: EthSpec>() -> BeaconChainHarness<EphemeralHarnessType<E>> {
let mut spec = E::default_spec();
spec.altair_fork_epoch = Some(Epoch::new(0)); // We use altair for all tests
let harness = BeaconChainHarness::builder(E::default())
.spec(spec)
.keypairs(KEYPAIRS.to_vec())
.fresh_ephemeral_store()
.build();
harness.advance_slot();
harness
}
#[tokio::test]
async fn test_sync_committee_rewards() {
let num_block_produced = MinimalEthSpec::slots_per_epoch();
let harness = get_harness::<MinimalEthSpec>();
let latest_block_root = harness
.extend_chain(
num_block_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Create and add sync committee message to op_pool
let sync_contributions = harness.make_sync_contributions(
&harness.get_current_state(),
latest_block_root,
harness.get_current_slot(),
RelativeSyncCommittee::Current,
);
harness
.process_sync_contributions(sync_contributions)
.unwrap();
// Add block
let chain = &harness.chain;
let (head_state, head_state_root) = harness.get_current_state_and_root();
let target_slot = harness.get_current_slot() + 1;
let (block_root, mut state) = harness
.add_attested_block_at_slot(target_slot, head_state, head_state_root, &[])
.await
.unwrap();
let block = harness.get_block(block_root).unwrap();
let parent_block = chain
.get_blinded_block(&block.parent_root())
.unwrap()
.unwrap();
let parent_state = chain
.get_state(&parent_block.state_root(), Some(parent_block.slot()))
.unwrap()
.unwrap();
let reward_payload = chain
.compute_sync_committee_rewards(block.message(), &mut state)
.unwrap();
let rewards = reward_payload
.iter()
.map(|reward| (reward.validator_index, reward.reward))
.collect::<HashMap<_, _>>();
let proposer_index = state
.get_beacon_proposer_index(target_slot, &MinimalEthSpec::default_spec())
.unwrap();
let mut mismatches = vec![];
for validator in state.validators() {
let validator_index = state
.clone()
.get_validator_index(&validator.pubkey)
.unwrap()
.unwrap();
let pre_state_balance = parent_state.balances()[validator_index];
let post_state_balance = state.balances()[validator_index];
let sync_committee_reward = rewards.get(&(validator_index as u64)).unwrap_or(&0);
if validator_index == proposer_index {
continue; // Ignore proposer
}
if pre_state_balance as i64 + *sync_committee_reward != post_state_balance as i64 {
mismatches.push(validator_index.to_string());
}
}
assert_eq!(
mismatches.len(),
0,
"Expect 0 mismatches, but these validators have mismatches on balance: {} ",
mismatches.join(",")
);
}

View File

@ -1013,8 +1013,8 @@ fn check_shuffling_compatible(
// Ensure blocks from abandoned forks are pruned from the Hot DB
#[tokio::test]
async fn prunes_abandoned_fork_between_two_finalized_checkpoints() {
const HONEST_VALIDATOR_COUNT: usize = 16 + 0;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 8 - 0;
const HONEST_VALIDATOR_COUNT: usize = 32 + 0;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 16 - 0;
const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT;
let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT);
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();
@ -1123,8 +1123,8 @@ async fn prunes_abandoned_fork_between_two_finalized_checkpoints() {
#[tokio::test]
async fn pruning_does_not_touch_abandoned_block_shared_with_canonical_chain() {
const HONEST_VALIDATOR_COUNT: usize = 16 + 0;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 8 - 0;
const HONEST_VALIDATOR_COUNT: usize = 32 + 0;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 16 - 0;
const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT;
let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT);
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();
@ -1255,8 +1255,8 @@ async fn pruning_does_not_touch_abandoned_block_shared_with_canonical_chain() {
#[tokio::test]
async fn pruning_does_not_touch_blocks_prior_to_finalization() {
const HONEST_VALIDATOR_COUNT: usize = 16;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 8;
const HONEST_VALIDATOR_COUNT: usize = 32;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 16;
const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT;
let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT);
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();
@ -1350,8 +1350,8 @@ async fn pruning_does_not_touch_blocks_prior_to_finalization() {
#[tokio::test]
async fn prunes_fork_growing_past_youngest_finalized_checkpoint() {
const HONEST_VALIDATOR_COUNT: usize = 16 + 0;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 8 - 0;
const HONEST_VALIDATOR_COUNT: usize = 32 + 0;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 16 - 0;
const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT;
let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT);
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();
@ -1495,8 +1495,8 @@ async fn prunes_fork_growing_past_youngest_finalized_checkpoint() {
// This is to check if state outside of normal block processing are pruned correctly.
#[tokio::test]
async fn prunes_skipped_slots_states() {
const HONEST_VALIDATOR_COUNT: usize = 16 + 0;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 8 - 0;
const HONEST_VALIDATOR_COUNT: usize = 32 + 0;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 16 - 0;
const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT;
let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT);
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();
@ -1624,8 +1624,8 @@ async fn prunes_skipped_slots_states() {
// This is to check if state outside of normal block processing are pruned correctly.
#[tokio::test]
async fn finalizes_non_epoch_start_slot() {
const HONEST_VALIDATOR_COUNT: usize = 16 + 0;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 8 - 0;
const HONEST_VALIDATOR_COUNT: usize = 32 + 0;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 16 - 0;
const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT;
let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT);
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();

View File

@ -45,6 +45,7 @@ fn get_valid_sync_committee_message(
harness: &BeaconChainHarness<EphemeralHarnessType<E>>,
slot: Slot,
relative_sync_committee: RelativeSyncCommittee,
message_index: usize,
) -> (SyncCommitteeMessage, usize, SecretKey, SyncSubnetId) {
let head_state = harness.chain.head_beacon_state_cloned();
let head_block_root = harness.chain.head_snapshot().beacon_block_root;
@ -52,7 +53,7 @@ fn get_valid_sync_committee_message(
.make_sync_committee_messages(&head_state, head_block_root, slot, relative_sync_committee)
.get(0)
.expect("sync messages should exist")
.get(0)
.get(message_index)
.expect("first sync message should exist")
.clone();
@ -494,7 +495,7 @@ async fn unaggregated_gossip_verification() {
let current_slot = harness.chain.slot().expect("should get slot");
let (valid_sync_committee_message, expected_validator_index, validator_sk, subnet_id) =
get_valid_sync_committee_message(&harness, current_slot, RelativeSyncCommittee::Current);
get_valid_sync_committee_message(&harness, current_slot, RelativeSyncCommittee::Current, 0);
macro_rules! assert_invalid {
($desc: tt, $attn_getter: expr, $subnet_getter: expr, $($error: pat_param) |+ $( if $guard: expr )?) => {
@ -644,7 +645,7 @@ async fn unaggregated_gossip_verification() {
// **Incorrectly** create a sync message using the current sync committee
let (next_valid_sync_committee_message, _, _, next_subnet_id) =
get_valid_sync_committee_message(&harness, target_slot, RelativeSyncCommittee::Current);
get_valid_sync_committee_message(&harness, target_slot, RelativeSyncCommittee::Current, 1);
assert_invalid!(
"sync message on incorrect subnet",

View File

@ -19,7 +19,7 @@ use types::{
};
// Should ideally be divisible by 3.
pub const VALIDATOR_COUNT: usize = 24;
pub const VALIDATOR_COUNT: usize = 48;
lazy_static! {
/// A cached set of keys.

View File

@ -10,6 +10,20 @@ use types::{
pub const DEFAULT_ETH1_BLOCK_HASH: &[u8] = &[0x42; 32];
pub fn bls_withdrawal_credentials(pubkey: &PublicKey, spec: &ChainSpec) -> Hash256 {
let mut credentials = hash(&pubkey.as_ssz_bytes());
credentials[0] = spec.bls_withdrawal_prefix_byte;
Hash256::from_slice(&credentials)
}
fn eth1_withdrawal_credentials(pubkey: &PublicKey, spec: &ChainSpec) -> Hash256 {
let fake_execution_address = &hash(&pubkey.as_ssz_bytes())[0..20];
let mut credentials = [0u8; 32];
credentials[0] = spec.eth1_address_withdrawal_prefix_byte;
credentials[12..].copy_from_slice(fake_execution_address);
Hash256::from_slice(&credentials)
}
/// Builds a genesis state as defined by the Eth2 interop procedure (see below).
///
/// Reference:
@ -21,20 +35,75 @@ pub fn interop_genesis_state<T: EthSpec>(
execution_payload_header: Option<ExecutionPayloadHeader<T>>,
spec: &ChainSpec,
) -> Result<BeaconState<T>, String> {
let withdrawal_credentials = keypairs
.iter()
.map(|keypair| bls_withdrawal_credentials(&keypair.pk, spec))
.collect::<Vec<_>>();
interop_genesis_state_with_withdrawal_credentials::<T>(
keypairs,
&withdrawal_credentials,
genesis_time,
eth1_block_hash,
execution_payload_header,
spec,
)
}
// returns an interop genesis state except every other
// validator has eth1 withdrawal credentials
pub fn interop_genesis_state_with_eth1<T: EthSpec>(
keypairs: &[Keypair],
genesis_time: u64,
eth1_block_hash: Hash256,
execution_payload_header: Option<ExecutionPayloadHeader<T>>,
spec: &ChainSpec,
) -> Result<BeaconState<T>, String> {
let withdrawal_credentials = keypairs
.iter()
.enumerate()
.map(|(index, keypair)| {
if index % 2 == 0 {
bls_withdrawal_credentials(&keypair.pk, spec)
} else {
eth1_withdrawal_credentials(&keypair.pk, spec)
}
})
.collect::<Vec<_>>();
interop_genesis_state_with_withdrawal_credentials::<T>(
keypairs,
&withdrawal_credentials,
genesis_time,
eth1_block_hash,
execution_payload_header,
spec,
)
}
pub fn interop_genesis_state_with_withdrawal_credentials<T: EthSpec>(
keypairs: &[Keypair],
withdrawal_credentials: &[Hash256],
genesis_time: u64,
eth1_block_hash: Hash256,
execution_payload_header: Option<ExecutionPayloadHeader<T>>,
spec: &ChainSpec,
) -> Result<BeaconState<T>, String> {
if keypairs.len() != withdrawal_credentials.len() {
return Err(format!(
"wrong number of withdrawal credentials, expected: {}, got: {}",
keypairs.len(),
withdrawal_credentials.len()
));
}
let eth1_timestamp = 2_u64.pow(40);
let amount = spec.max_effective_balance;
let withdrawal_credentials = |pubkey: &PublicKey| {
let mut credentials = hash(&pubkey.as_ssz_bytes());
credentials[0] = spec.bls_withdrawal_prefix_byte;
Hash256::from_slice(&credentials)
};
let datas = keypairs
.into_par_iter()
.map(|keypair| {
.zip(withdrawal_credentials.into_par_iter())
.map(|(keypair, &withdrawal_credentials)| {
let mut data = DepositData {
withdrawal_credentials: withdrawal_credentials(&keypair.pk),
withdrawal_credentials,
pubkey: keypair.pk.clone().into(),
amount,
signature: Signature::empty().into(),
@ -133,4 +202,83 @@ mod test {
"validator count should be correct"
);
}
#[test]
fn interop_state_with_eth1() {
let validator_count = 16;
let genesis_time = 42;
let spec = &TestEthSpec::default_spec();
let keypairs = generate_deterministic_keypairs(validator_count);
let state = interop_genesis_state_with_eth1::<TestEthSpec>(
&keypairs,
genesis_time,
Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH),
None,
spec,
)
.expect("should build state");
assert_eq!(
state.eth1_data().block_hash,
Hash256::from_slice(&[0x42; 32]),
"eth1 block hash should be co-ordinated junk"
);
assert_eq!(
state.genesis_time(),
genesis_time,
"genesis time should be as specified"
);
for b in state.balances() {
assert_eq!(
*b, spec.max_effective_balance,
"validator balances should be max effective balance"
);
}
for (index, v) in state.validators().iter().enumerate() {
let creds = v.withdrawal_credentials.as_bytes();
if index % 2 == 0 {
assert_eq!(
creds[0], spec.bls_withdrawal_prefix_byte,
"first byte of withdrawal creds should be bls prefix"
);
assert_eq!(
&creds[1..],
&hash(&v.pubkey.as_ssz_bytes())[1..],
"rest of withdrawal creds should be pubkey hash"
);
} else {
assert_eq!(
creds[0], spec.eth1_address_withdrawal_prefix_byte,
"first byte of withdrawal creds should be eth1 prefix"
);
assert_eq!(
creds[1..12],
[0u8; 11],
"bytes [1:12] of withdrawal creds must be zero"
);
assert_eq!(
&creds[12..],
&hash(&v.pubkey.as_ssz_bytes())[0..20],
"rest of withdrawal creds should be first 20 bytes of pubkey hash"
)
}
}
assert_eq!(
state.balances().len(),
validator_count,
"validator balances len should be correct"
);
assert_eq!(
state.validators().len(),
validator_count,
"validator count should be correct"
);
}
}

View File

@ -5,5 +5,8 @@ mod interop;
pub use eth1::Config as Eth1Config;
pub use eth1::Eth1Endpoint;
pub use eth1_genesis_service::{Eth1GenesisService, Statistics};
pub use interop::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH};
pub use interop::{
bls_withdrawal_credentials, interop_genesis_state, interop_genesis_state_with_eth1,
interop_genesis_state_with_withdrawal_credentials, DEFAULT_ETH1_BLOCK_HASH,
};
pub use types::test_utils::generate_deterministic_keypairs;

View File

@ -45,6 +45,7 @@ logging = { path = "../../common/logging" }
serde_json = "1.0.58"
proto_array = { path = "../../consensus/proto_array" }
unused_port = {path = "../../common/unused_port"}
genesis = { path = "../genesis" }
[[test]]
name = "bn_http_api_tests"

View File

@ -16,6 +16,7 @@ mod metrics;
mod proposer_duties;
mod publish_blocks;
mod state_id;
mod sync_committee_rewards;
mod sync_committees;
mod ui;
mod validator_inclusion;
@ -1671,7 +1672,7 @@ pub fn serve<T: BeaconChainTypes>(
.and_then(
|chain: Arc<BeaconChain<T>>,
address_changes: Vec<SignedBlsToExecutionChange>,
#[allow(unused)] network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
blocking_json_task(move || {
let mut failures = vec![];
@ -1679,15 +1680,38 @@ pub fn serve<T: BeaconChainTypes>(
for (index, address_change) in address_changes.into_iter().enumerate() {
let validator_index = address_change.message.validator_index;
match chain.verify_bls_to_execution_change_for_gossip(address_change) {
match chain.verify_bls_to_execution_change_for_http_api(address_change) {
Ok(ObservationOutcome::New(verified_address_change)) => {
let validator_index =
verified_address_change.as_inner().message.validator_index;
let address = verified_address_change
.as_inner()
.message
.to_execution_address;
// New to P2P *and* op pool, gossip immediately if post-Capella.
let publish = chain.current_slot_is_post_capella().unwrap_or(false);
if publish {
publish_pubsub_message(
&network_tx,
PubsubMessage::BlsToExecutionChange(Box::new(
verified_address_change.as_inner().clone(),
)),
)?;
}
// Import to op pool (may return `false` if there's a race).
let imported =
chain.import_bls_to_execution_change(verified_address_change);
info!(
log,
"Processed BLS to execution change";
"validator_index" => validator_index,
"address" => ?address,
"published" => publish,
"imported" => imported,
);
}
Ok(ObservationOutcome::AlreadyKnown) => {
debug!(
@ -1697,11 +1721,12 @@ pub fn serve<T: BeaconChainTypes>(
);
}
Err(e) => {
error!(
warn!(
log,
"Invalid BLS to execution change";
"validator_index" => validator_index,
"source" => "HTTP API",
"reason" => ?e,
"source" => "HTTP",
);
failures.push(api_types::Failure::new(
index,
@ -1770,6 +1795,41 @@ pub fn serve<T: BeaconChainTypes>(
},
);
/*
* beacon/rewards
*/
let beacon_rewards_path = eth_v1
.and(warp::path("beacon"))
.and(warp::path("rewards"))
.and(chain_filter.clone());
// POST beacon/rewards/sync_committee/{block_id}
let post_beacon_rewards_sync_committee = beacon_rewards_path
.clone()
.and(warp::path("sync_committee"))
.and(block_id_or_err)
.and(warp::path::end())
.and(warp::body::json())
.and(log_filter.clone())
.and_then(
|chain: Arc<BeaconChain<T>>,
block_id: BlockId,
validators: Vec<ValidatorId>,
log: Logger| {
blocking_json_task(move || {
let (rewards, execution_optimistic) =
sync_committee_rewards::compute_sync_committee_rewards(
chain, block_id, validators, log,
)?;
Ok(rewards)
.map(api_types::GenericResponse::from)
.map(|resp| resp.add_execution_optimistic(execution_optimistic))
})
},
);
/*
* config
*/
@ -3491,7 +3551,8 @@ pub fn serve<T: BeaconChainTypes>(
.or(get_lighthouse_block_packing_efficiency.boxed())
.or(get_lighthouse_merge_readiness.boxed())
.or(get_lighthouse_blobs_sidecars.boxed())
.or(get_events.boxed()),
.or(get_events.boxed())
.recover(warp_utils::reject::handle_rejection),
)
.boxed()
.or(warp::post().and(
@ -3503,6 +3564,7 @@ pub fn serve<T: BeaconChainTypes>(
.or(post_beacon_pool_proposer_slashings.boxed())
.or(post_beacon_pool_voluntary_exits.boxed())
.or(post_beacon_pool_sync_committees.boxed())
.or(post_beacon_rewards_sync_committee.boxed())
.or(post_beacon_pool_bls_to_execution_changes.boxed())
.or(post_validator_duties_attester.boxed())
.or(post_validator_duties_sync.boxed())
@ -3516,7 +3578,8 @@ pub fn serve<T: BeaconChainTypes>(
.or(post_lighthouse_database_reconstruct.boxed())
.or(post_lighthouse_database_historical_blocks.boxed())
.or(post_lighthouse_block_rewards.boxed())
.or(post_lighthouse_ui_validator_metrics.boxed()),
.or(post_lighthouse_ui_validator_metrics.boxed())
.recover(warp_utils::reject::handle_rejection),
))
.recover(warp_utils::reject::handle_rejection)
.with(slog_logging(log.clone()))

View File

@ -0,0 +1,77 @@
use crate::{BlockId, ExecutionOptimistic};
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::lighthouse::SyncCommitteeReward;
use eth2::types::ValidatorId;
use slog::{debug, Logger};
use state_processing::BlockReplayer;
use std::sync::Arc;
use types::{BeaconState, SignedBlindedBeaconBlock};
use warp_utils::reject::{beacon_chain_error, custom_not_found};
pub fn compute_sync_committee_rewards<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_id: BlockId,
validators: Vec<ValidatorId>,
log: Logger,
) -> Result<(Option<Vec<SyncCommitteeReward>>, ExecutionOptimistic), warp::Rejection> {
let (block, execution_optimistic) = block_id.blinded_block(&chain)?;
let mut state = get_state_before_applying_block(chain.clone(), &block)?;
let reward_payload = chain
.compute_sync_committee_rewards(block.message(), &mut state)
.map_err(beacon_chain_error)?;
let data = if reward_payload.is_empty() {
debug!(log, "compute_sync_committee_rewards returned empty");
None
} else if validators.is_empty() {
Some(reward_payload)
} else {
Some(
reward_payload
.into_iter()
.filter(|reward| {
validators.iter().any(|validator| match validator {
ValidatorId::Index(i) => reward.validator_index == *i,
ValidatorId::PublicKey(pubkey) => match state.get_validator_index(pubkey) {
Ok(Some(i)) => reward.validator_index == i as u64,
_ => false,
},
})
})
.collect::<Vec<SyncCommitteeReward>>(),
)
};
Ok((data, execution_optimistic))
}
fn get_state_before_applying_block<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block: &SignedBlindedBeaconBlock<T::EthSpec>,
) -> Result<BeaconState<T::EthSpec>, warp::reject::Rejection> {
let parent_block: SignedBlindedBeaconBlock<T::EthSpec> = chain
.get_blinded_block(&block.parent_root())
.and_then(|maybe_block| {
maybe_block.ok_or_else(|| BeaconChainError::MissingBeaconBlock(block.parent_root()))
})
.map_err(|e| custom_not_found(format!("Parent block is not available! {:?}", e)))?;
let parent_state = chain
.get_state(&parent_block.state_root(), Some(parent_block.slot()))
.and_then(|maybe_state| {
maybe_state
.ok_or_else(|| BeaconChainError::MissingBeaconState(parent_block.state_root()))
})
.map_err(|e| custom_not_found(format!("Parent state is not available! {:?}", e)))?;
let replayer = BlockReplayer::new(parent_state, &chain.spec)
.no_signature_verification()
.state_root_iter([Ok((parent_block.state_root(), parent_block.slot()))].into_iter())
.minimal_block_root_verification()
.apply_blocks(vec![], Some(block.slot()))
.map_err(beacon_chain_error)?;
Ok(replayer.into_state())
}

View File

@ -1,5 +1,7 @@
use beacon_chain::{
test_utils::{BeaconChainHarness, BoxedMutator, EphemeralHarnessType},
test_utils::{
BeaconChainHarness, BoxedMutator, Builder as HarnessBuilder, EphemeralHarnessType,
},
BeaconChain, BeaconChainTypes,
};
use directory::DEFAULT_ROOT_DIR;
@ -55,25 +57,39 @@ pub struct ApiServer<E: EthSpec, SFut: Future<Output = ()>> {
pub external_peer_id: PeerId,
}
type Initializer<E> = Box<
dyn FnOnce(HarnessBuilder<EphemeralHarnessType<E>>) -> HarnessBuilder<EphemeralHarnessType<E>>,
>;
type Mutator<E> = BoxedMutator<E, MemoryStore<E>, MemoryStore<E>>;
impl<E: EthSpec> InteractiveTester<E> {
pub async fn new(spec: Option<ChainSpec>, validator_count: usize) -> Self {
Self::new_with_mutator(spec, validator_count, None).await
Self::new_with_initializer_and_mutator(spec, validator_count, None, None).await
}
pub async fn new_with_mutator(
pub async fn new_with_initializer_and_mutator(
spec: Option<ChainSpec>,
validator_count: usize,
initializer: Option<Initializer<E>>,
mutator: Option<Mutator<E>>,
) -> Self {
let mut harness_builder = BeaconChainHarness::builder(E::default())
.spec_or_default(spec)
.deterministic_keypairs(validator_count)
.logger(test_logger())
.mock_execution_layer()
.fresh_ephemeral_store();
.mock_execution_layer();
harness_builder = if let Some(initializer) = initializer {
// Apply custom initialization provided by the caller.
initializer(harness_builder)
} else {
// Apply default initial configuration.
harness_builder
.deterministic_keypairs(validator_count)
.fresh_ephemeral_store()
};
// Add a mutator for the beacon chain builder which will be called in
// `HarnessBuilder::build`.
if let Some(mutator) = mutator {
harness_builder = harness_builder.initial_mutator(mutator);
}

View File

@ -1,8 +1,15 @@
//! Tests for API behaviour across fork boundaries.
use crate::common::*;
use beacon_chain::{test_utils::RelativeSyncCommittee, StateSkipConfig};
use eth2::types::{StateId, SyncSubcommittee};
use types::{ChainSpec, Epoch, EthSpec, MinimalEthSpec, Slot};
use beacon_chain::{
test_utils::{RelativeSyncCommittee, DEFAULT_ETH1_BLOCK_HASH, HARNESS_GENESIS_TIME},
StateSkipConfig,
};
use eth2::types::{IndexedErrorMessage, StateId, SyncSubcommittee};
use genesis::{bls_withdrawal_credentials, interop_genesis_state_with_withdrawal_credentials};
use types::{
test_utils::{generate_deterministic_keypair, generate_deterministic_keypairs},
Address, ChainSpec, Epoch, EthSpec, Hash256, MinimalEthSpec, Slot,
};
type E = MinimalEthSpec;
@ -12,6 +19,14 @@ fn altair_spec(altair_fork_epoch: Epoch) -> ChainSpec {
spec
}
fn capella_spec(capella_fork_epoch: Epoch) -> ChainSpec {
let mut spec = E::default_spec();
spec.altair_fork_epoch = Some(Epoch::new(0));
spec.bellatrix_fork_epoch = Some(Epoch::new(0));
spec.capella_fork_epoch = Some(capella_fork_epoch);
spec
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sync_committee_duties_across_fork() {
let validator_count = E::sync_committee_size();
@ -307,3 +322,203 @@ async fn sync_committee_indices_across_fork() {
);
}
}
/// Assert that an HTTP API error has the given status code and indexed errors for the given indices.
fn assert_server_indexed_error(error: eth2::Error, status_code: u16, indices: Vec<usize>) {
let eth2::Error::ServerIndexedMessage(IndexedErrorMessage {
code,
failures,
..
}) = error else {
panic!("wrong error, expected ServerIndexedMessage, got: {error:?}")
};
assert_eq!(code, status_code);
assert_eq!(failures.len(), indices.len());
for (index, failure) in indices.into_iter().zip(failures) {
assert_eq!(failure.index, index as u64);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn bls_to_execution_changes_update_all_around_capella_fork() {
let validator_count = 128;
let fork_epoch = Epoch::new(2);
let spec = capella_spec(fork_epoch);
let max_bls_to_execution_changes = E::max_bls_to_execution_changes();
// Use a genesis state with entirely BLS withdrawal credentials.
// Offset keypairs by `validator_count` to create keys distinct from the signing keys.
let validator_keypairs = generate_deterministic_keypairs(validator_count);
let withdrawal_keypairs = (0..validator_count)
.map(|i| Some(generate_deterministic_keypair(i + validator_count)))
.collect::<Vec<_>>();
let withdrawal_credentials = withdrawal_keypairs
.iter()
.map(|keypair| bls_withdrawal_credentials(&keypair.as_ref().unwrap().pk, &spec))
.collect::<Vec<_>>();
let genesis_state = interop_genesis_state_with_withdrawal_credentials(
&validator_keypairs,
&withdrawal_credentials,
HARNESS_GENESIS_TIME,
Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH),
None,
&spec,
)
.unwrap();
let tester = InteractiveTester::<E>::new_with_initializer_and_mutator(
Some(spec.clone()),
validator_count,
Some(Box::new(|harness_builder| {
harness_builder
.keypairs(validator_keypairs)
.withdrawal_keypairs(withdrawal_keypairs)
.genesis_state_ephemeral_store(genesis_state)
})),
None,
)
.await;
let harness = &tester.harness;
let client = &tester.client;
let all_validators = harness.get_all_validators();
let all_validators_u64 = all_validators.iter().map(|x| *x as u64).collect::<Vec<_>>();
// Create a bunch of valid address changes.
let valid_address_changes = all_validators_u64
.iter()
.map(|&validator_index| {
harness.make_bls_to_execution_change(
validator_index,
Address::from_low_u64_be(validator_index),
)
})
.collect::<Vec<_>>();
// Address changes which conflict with `valid_address_changes` on the address chosen.
let conflicting_address_changes = all_validators_u64
.iter()
.map(|&validator_index| {
harness.make_bls_to_execution_change(
validator_index,
Address::from_low_u64_be(validator_index + 1),
)
})
.collect::<Vec<_>>();
// Address changes signed with the wrong key.
let wrong_key_address_changes = all_validators_u64
.iter()
.map(|&validator_index| {
// Use the correct pubkey.
let pubkey = &harness.get_withdrawal_keypair(validator_index).pk;
// And the wrong secret key.
let secret_key = &harness
.get_withdrawal_keypair((validator_index + 1) % validator_count as u64)
.sk;
harness.make_bls_to_execution_change_with_keys(
validator_index,
Address::from_low_u64_be(validator_index),
pubkey,
secret_key,
)
})
.collect::<Vec<_>>();
// Submit some changes before Capella. Just enough to fill two blocks.
let num_pre_capella = validator_count / 4;
let blocks_filled_pre_capella = 2;
assert_eq!(
num_pre_capella,
blocks_filled_pre_capella * max_bls_to_execution_changes
);
client
.post_beacon_pool_bls_to_execution_changes(&valid_address_changes[..num_pre_capella])
.await
.unwrap();
// Conflicting changes for the same validators should all fail.
let error = client
.post_beacon_pool_bls_to_execution_changes(&conflicting_address_changes[..num_pre_capella])
.await
.unwrap_err();
assert_server_indexed_error(error, 400, (0..num_pre_capella).collect());
// Re-submitting the same changes should be accepted.
client
.post_beacon_pool_bls_to_execution_changes(&valid_address_changes[..num_pre_capella])
.await
.unwrap();
// Invalid changes signed with the wrong keys should all be rejected without affecting the seen
// indices filters (apply ALL of them).
let error = client
.post_beacon_pool_bls_to_execution_changes(&wrong_key_address_changes)
.await
.unwrap_err();
assert_server_indexed_error(error, 400, all_validators.clone());
// Advance to right before Capella.
let capella_slot = fork_epoch.start_slot(E::slots_per_epoch());
harness.extend_to_slot(capella_slot - 1).await;
assert_eq!(harness.head_slot(), capella_slot - 1);
// Add Capella blocks which should be full of BLS to execution changes.
for i in 0..validator_count / max_bls_to_execution_changes {
let head_block_root = harness.extend_slots(1).await;
let head_block = harness
.chain
.get_block(&head_block_root)
.await
.unwrap()
.unwrap();
let bls_to_execution_changes = head_block
.message()
.body()
.bls_to_execution_changes()
.unwrap();
// Block should be full.
assert_eq!(
bls_to_execution_changes.len(),
max_bls_to_execution_changes,
"block not full on iteration {i}"
);
// Included changes should be the ones from `valid_address_changes` in any order.
for address_change in bls_to_execution_changes.iter() {
assert!(valid_address_changes.contains(address_change));
}
// After the initial 2 blocks, add the rest of the changes using a large
// request containing all the valid, all the conflicting and all the invalid.
// Despite the invalid and duplicate messages, the new ones should still get picked up by
// the pool.
if i == blocks_filled_pre_capella - 1 {
let all_address_changes: Vec<_> = [
valid_address_changes.clone(),
conflicting_address_changes.clone(),
wrong_key_address_changes.clone(),
]
.concat();
let error = client
.post_beacon_pool_bls_to_execution_changes(&all_address_changes)
.await
.unwrap_err();
assert_server_indexed_error(
error,
400,
(validator_count..3 * validator_count).collect(),
);
}
}
// Eventually all validators should have eth1 withdrawal credentials.
let head_state = harness.get_current_state();
for validator in head_state.validators() {
assert!(validator.has_eth1_withdrawal_credential(&spec));
}
}

View File

@ -278,9 +278,10 @@ pub async fn proposer_boost_re_org_test(
let num_empty_votes = Some(attesters_per_slot * percent_empty_votes / 100);
let num_head_votes = Some(attesters_per_slot * percent_head_votes / 100);
let tester = InteractiveTester::<E>::new_with_mutator(
let tester = InteractiveTester::<E>::new_with_initializer_and_mutator(
Some(spec),
validator_count,
None,
Some(Box::new(move |builder| {
builder
.proposer_re_org_threshold(Some(ReOrgThreshold(re_org_threshold)))
@ -544,7 +545,7 @@ pub async fn proposer_boost_re_org_test(
pub async fn fork_choice_before_proposal() {
// Validator count needs to be at least 32 or proposer boost gets set to 0 when computing
// `validator_count // 32`.
let validator_count = 32;
let validator_count = 64;
let all_validators = (0..validator_count).collect::<Vec<_>>();
let num_initial: u64 = 31;

View File

@ -70,7 +70,8 @@ use types::{
SyncCommitteeMessage, SyncSubnetId,
};
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork,
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
QueuedUnaggregate, ReadyWork,
};
use worker::{Toolbox, Worker};
@ -144,6 +145,10 @@ const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024;
/// before we start dropping them.
const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024;
/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored
/// for reprocessing before we start dropping them.
const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128;
/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping
/// them.
const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048;
@ -233,6 +238,7 @@ pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request";
pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap";
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
/// A simple first-in-first-out queue with a maximum length.
@ -781,6 +787,21 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
seen_timestamp,
},
},
ReadyWork::LightClientUpdate(QueuedLightClientUpdate {
peer_id,
message_id,
light_client_optimistic_update,
seen_timestamp,
..
}) => Self {
drop_during_sync: true,
work: Work::UnknownLightClientOptimisticUpdate {
message_id,
peer_id,
light_client_optimistic_update,
seen_timestamp,
},
},
}
}
}
@ -820,6 +841,12 @@ pub enum Work<T: BeaconChainTypes> {
aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
seen_timestamp: Duration,
},
UnknownLightClientOptimisticUpdate {
message_id: MessageId,
peer_id: PeerId,
light_client_optimistic_update: Box<LightClientOptimisticUpdate<T::EthSpec>>,
seen_timestamp: Duration,
},
GossipAggregateBatch {
packages: Vec<GossipAggregatePackage<T::EthSpec>>,
},
@ -957,6 +984,7 @@ impl<T: BeaconChainTypes> Work<T> {
Work::LightClientBootstrapRequest { .. } => LIGHT_CLIENT_BOOTSTRAP_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
Work::GossipBlsToExecutionChange { .. } => GOSSIP_BLS_TO_EXECUTION_CHANGE,
}
}
@ -1092,6 +1120,8 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
// Using a FIFO queue for light client updates to maintain sequence order.
let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN);
let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN);
let mut unknown_light_client_update_queue =
FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN);
// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
@ -1483,6 +1513,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::UnknownBlockAggregate { .. } => {
unknown_block_aggregate_queue.push(work)
}
Work::UnknownLightClientOptimisticUpdate { .. } => {
unknown_light_client_update_queue.push(work, work_id, &self.log)
}
Work::GossipBlsToExecutionChange { .. } => {
gossip_bls_to_execution_change_queue.push(work, work_id, &self.log)
}
@ -1848,6 +1881,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
message_id,
peer_id,
*light_client_optimistic_update,
Some(work_reprocessing_tx),
seen_timestamp,
)
}),
@ -1998,6 +2032,20 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
seen_timestamp,
)
}),
Work::UnknownLightClientOptimisticUpdate {
message_id,
peer_id,
light_client_optimistic_update,
seen_timestamp,
} => task_spawner.spawn_blocking(move || {
worker.process_gossip_optimistic_update(
message_id,
peer_id,
*light_client_optimistic_update,
None,
seen_timestamp,
)
}),
};
}
}

View File

@ -20,7 +20,7 @@ use futures::task::Poll;
use futures::{Stream, StreamExt};
use lighthouse_network::{MessageId, PeerId};
use logging::TimeLatch;
use slog::{crit, debug, error, warn, Logger};
use slog::{crit, debug, error, trace, warn, Logger};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
@ -30,12 +30,16 @@ use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time::error::Error as TimeError;
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SubnetId};
use types::{
Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof,
SubnetId,
};
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
const GOSSIP_BLOCKS: &str = "gossip_blocks";
const RPC_BLOCKS: &str = "rpc_blocks";
const ATTESTATIONS: &str = "attestations";
const LIGHT_CLIENT_UPDATES: &str = "lc_updates";
/// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts.
/// This is to account for any slight drift in the system clock.
@ -44,6 +48,9 @@ const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5);
/// For how long to queue aggregated and unaggregated attestations for re-processing.
pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue light client updates for re-processing.
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue rpc blocks before sending them back for reprocessing.
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3);
@ -55,6 +62,9 @@ const MAXIMUM_QUEUED_BLOCKS: usize = 16;
/// How many attestations we keep before new ones get dropped.
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
/// How many light client updates we keep before new ones get dropped.
const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128;
/// Messages that the scheduler can receive.
pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
/// A block that has been received early and we should queue for later processing.
@ -62,13 +72,18 @@ pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
/// hash until the gossip block is imported.
RpcBlock(QueuedRpcBlock<T::EthSpec>),
/// A block that was successfully processed. We use this to handle attestations for unknown
/// blocks.
BlockImported(Hash256),
/// A block that was successfully processed. We use this to handle attestations and light client updates
/// for unknown blocks.
BlockImported {
block_root: Hash256,
parent_root: Hash256,
},
/// An unaggregated attestation that references an unknown block.
UnknownBlockUnaggregate(QueuedUnaggregate<T::EthSpec>),
/// An aggregated attestation that references an unknown block.
UnknownBlockAggregate(QueuedAggregate<T::EthSpec>),
/// A light client optimistic update that references a parent root that has not been seen as a parent.
UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate<T::EthSpec>),
}
/// Events sent by the scheduler once they are ready for re-processing.
@ -77,6 +92,7 @@ pub enum ReadyWork<T: BeaconChainTypes> {
RpcBlock(QueuedRpcBlock<T::EthSpec>),
Unaggregate(QueuedUnaggregate<T::EthSpec>),
Aggregate(QueuedAggregate<T::EthSpec>),
LightClientUpdate(QueuedLightClientUpdate<T::EthSpec>),
}
/// An Attestation for which the corresponding block was not seen while processing, queued for
@ -99,6 +115,16 @@ pub struct QueuedAggregate<T: EthSpec> {
pub seen_timestamp: Duration,
}
/// A light client update for which the corresponding parent block was not seen while processing,
/// queued for later.
pub struct QueuedLightClientUpdate<T: EthSpec> {
pub peer_id: PeerId,
pub message_id: MessageId,
pub light_client_optimistic_update: Box<LightClientOptimisticUpdate<T>>,
pub parent_root: Hash256,
pub seen_timestamp: Duration,
}
/// A block that arrived early and has been queued for later import.
pub struct QueuedGossipBlock<T: BeaconChainTypes> {
pub peer_id: PeerId,
@ -127,6 +153,8 @@ enum InboundEvent<T: BeaconChainTypes> {
ReadyRpcBlock(QueuedRpcBlock<T::EthSpec>),
/// An aggregated or unaggregated attestation is ready for re-processing.
ReadyAttestation(QueuedAttestationId),
/// A light client update that is ready for re-processing.
ReadyLightClientUpdate(QueuedLightClientUpdateId),
/// A `DelayQueue` returned an error.
DelayQueueError(TimeError, &'static str),
/// A message sent to the `ReprocessQueue`
@ -147,6 +175,8 @@ struct ReprocessQueue<T: BeaconChainTypes> {
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock<T::EthSpec>>,
/// Queue to manage scheduled attestations.
attestations_delay_queue: DelayQueue<QueuedAttestationId>,
/// Queue to manage scheduled light client updates.
lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>,
/* Queued items */
/// Queued blocks.
@ -157,15 +187,23 @@ struct ReprocessQueue<T: BeaconChainTypes> {
queued_unaggregates: FnvHashMap<usize, (QueuedUnaggregate<T::EthSpec>, DelayKey)>,
/// Attestations (aggregated and unaggregated) per root.
awaiting_attestations_per_root: HashMap<Hash256, Vec<QueuedAttestationId>>,
/// Queued Light Client Updates.
queued_lc_updates: FnvHashMap<usize, (QueuedLightClientUpdate<T::EthSpec>, DelayKey)>,
/// Light Client Updates per parent_root.
awaiting_lc_updates_per_parent_root: HashMap<Hash256, Vec<QueuedLightClientUpdateId>>,
/* Aux */
/// Next attestation id, used for both aggregated and unaggregated attestations
next_attestation: usize,
next_lc_update: usize,
early_block_debounce: TimeLatch,
rpc_block_debounce: TimeLatch,
attestation_delay_debounce: TimeLatch,
lc_update_delay_debounce: TimeLatch,
}
pub type QueuedLightClientUpdateId = usize;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum QueuedAttestationId {
Aggregate(usize),
@ -235,6 +273,20 @@ impl<T: BeaconChainTypes> Stream for ReprocessQueue<T> {
Poll::Ready(None) | Poll::Pending => (),
}
match self.lc_updates_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(lc_id))) => {
return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate(
lc_id.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
// Last empty the messages channel.
match self.work_reprocessing_rx.poll_recv(cx) {
Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))),
@ -264,14 +316,19 @@ pub fn spawn_reprocess_scheduler<T: BeaconChainTypes>(
gossip_block_delay_queue: DelayQueue::new(),
rpc_block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
queued_lc_updates: FnvHashMap::default(),
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
awaiting_attestations_per_root: HashMap::new(),
awaiting_lc_updates_per_parent_root: HashMap::new(),
next_attestation: 0,
next_lc_update: 0,
early_block_debounce: TimeLatch::default(),
rpc_block_debounce: TimeLatch::default(),
attestation_delay_debounce: TimeLatch::default(),
lc_update_delay_debounce: TimeLatch::default(),
};
executor.spawn(
@ -473,9 +530,49 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
self.next_attestation += 1;
}
InboundEvent::Msg(BlockImported(root)) => {
InboundEvent::Msg(UnknownLightClientOptimisticUpdate(
queued_light_client_optimistic_update,
)) => {
if self.lc_updates_delay_queue.len() >= MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES {
if self.lc_update_delay_debounce.elapsed() {
error!(
log,
"Light client updates delay queue is full";
"queue_size" => MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES,
"msg" => "check system clock"
);
}
// Drop the light client update.
return;
}
let lc_id: QueuedLightClientUpdateId = self.next_lc_update;
// Register the delay.
let delay_key = self
.lc_updates_delay_queue
.insert(lc_id, QUEUED_LIGHT_CLIENT_UPDATE_DELAY);
// Register the light client update for the corresponding root.
self.awaiting_lc_updates_per_parent_root
.entry(queued_light_client_optimistic_update.parent_root)
.or_default()
.push(lc_id);
// Store the light client update and its info.
self.queued_lc_updates.insert(
self.next_lc_update,
(queued_light_client_optimistic_update, delay_key),
);
self.next_lc_update += 1;
}
InboundEvent::Msg(BlockImported {
block_root,
parent_root,
}) => {
// Unqueue the attestations we have for this root, if any.
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&root) {
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) {
for id in queued_ids {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS,
@ -511,12 +608,62 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
error!(
log,
"Unknown queued attestation for block root";
"block_root" => ?root,
"block_root" => ?block_root,
"att_id" => ?id,
);
}
}
}
// Unqueue the light client optimistic updates we have for this root, if any.
if let Some(queued_lc_id) = self
.awaiting_lc_updates_per_parent_root
.remove(&parent_root)
{
debug!(
log,
"Dequeuing light client optimistic updates";
"parent_root" => %parent_root,
"count" => queued_lc_id.len(),
);
for lc_id in queued_lc_id {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES,
);
if let Some((work, delay_key)) = self.queued_lc_updates.remove(&lc_id).map(
|(light_client_optimistic_update, delay_key)| {
(
ReadyWork::LightClientUpdate(light_client_optimistic_update),
delay_key,
)
},
) {
// Remove the delay
self.lc_updates_delay_queue.remove(&delay_key);
// Send the work
match self.ready_work_tx.try_send(work) {
Ok(_) => trace!(
log,
"reprocessing light client update sent";
),
Err(_) => error!(
log,
"Failed to send scheduled light client update";
),
}
} else {
// There is a mismatch between the light client update ids registered for this
// root and the queued light client updates. This should never happen.
error!(
log,
"Unknown queued light client update for parent root";
"parent_root" => ?parent_root,
"lc_id" => ?lc_id,
);
}
}
}
}
// A block that was queued for later processing is now ready to be processed.
InboundEvent::ReadyGossipBlock(ready_block) => {
@ -591,6 +738,38 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
}
}
}
InboundEvent::ReadyLightClientUpdate(queued_id) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES,
);
if let Some((parent_root, work)) = self.queued_lc_updates.remove(&queued_id).map(
|(queued_lc_update, _delay_key)| {
(
queued_lc_update.parent_root,
ReadyWork::LightClientUpdate(queued_lc_update),
)
},
) {
if self.ready_work_tx.try_send(work).is_err() {
error!(
log,
"Failed to send scheduled light client optimistic update";
);
}
if let Some(queued_lc_updates) = self
.awaiting_lc_updates_per_parent_root
.get_mut(&parent_root)
{
if let Some(index) =
queued_lc_updates.iter().position(|&id| id == queued_id)
{
queued_lc_updates.swap_remove(index);
}
}
}
}
}
metrics::set_gauge_vec(
@ -608,5 +787,10 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
&[ATTESTATIONS],
self.attestations_delay_queue.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[LIGHT_CLIENT_UPDATES],
self.lc_updates_delay_queue.len() as i64,
);
}
}

View File

@ -28,7 +28,8 @@ use types::{
use super::{
super::work_reprocessing_queue::{
QueuedAggregate, QueuedGossipBlock, QueuedUnaggregate, ReprocessQueueMessage,
QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate,
ReprocessQueueMessage,
},
Worker,
};
@ -716,6 +717,10 @@ impl<T: BeaconChainTypes> Worker<T> {
&metrics::BEACON_BLOCK_GOSSIP_SLOT_START_DELAY_TIME,
block_delay,
);
metrics::set_gauge(
&metrics::BEACON_BLOCK_LAST_DELAY,
block_delay.as_millis() as i64,
);
let verification_result = self
.chain
@ -961,7 +966,10 @@ impl<T: BeaconChainTypes> Worker<T> {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
if reprocess_tx
.try_send(ReprocessQueueMessage::BlockImported(block_root))
.try_send(ReprocessQueueMessage::BlockImported {
block_root,
parent_root: block.message().parent_root(),
})
.is_err()
{
error!(
@ -1227,7 +1235,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"error" => ?e
);
// We ignore pre-capella messages without penalizing peers.
if matches!(e, BeaconChainError::BlsToExecutionChangeBadFork(_)) {
if matches!(e, BeaconChainError::BlsToExecutionPriorToCapella) {
self.propagate_validation_result(
message_id,
peer_id,
@ -1410,7 +1418,7 @@ impl<T: BeaconChainTypes> Worker<T> {
LightClientFinalityUpdateError::InvalidLightClientFinalityUpdate => {
debug!(
self.log,
"LC invalid finality update";
"Light client invalid finality update";
"peer" => %peer_id,
"error" => ?e,
);
@ -1424,7 +1432,7 @@ impl<T: BeaconChainTypes> Worker<T> {
LightClientFinalityUpdateError::TooEarly => {
debug!(
self.log,
"LC finality update too early";
"Light client finality update too early";
"peer" => %peer_id,
"error" => ?e,
);
@ -1437,7 +1445,7 @@ impl<T: BeaconChainTypes> Worker<T> {
}
LightClientFinalityUpdateError::FinalityUpdateAlreadySeen => debug!(
self.log,
"LC finality update already seen";
"Light client finality update already seen";
"peer" => %peer_id,
"error" => ?e,
),
@ -1446,7 +1454,7 @@ impl<T: BeaconChainTypes> Worker<T> {
| LightClientFinalityUpdateError::SigSlotStartIsNone
| LightClientFinalityUpdateError::FailedConstructingUpdate => debug!(
self.log,
"LC error constructing finality update";
"Light client error constructing finality update";
"peer" => %peer_id,
"error" => ?e,
),
@ -1461,22 +1469,77 @@ impl<T: BeaconChainTypes> Worker<T> {
message_id: MessageId,
peer_id: PeerId,
light_client_optimistic_update: LightClientOptimisticUpdate<T::EthSpec>,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
seen_timestamp: Duration,
) {
match self
.chain
.verify_optimistic_update_for_gossip(light_client_optimistic_update, seen_timestamp)
{
Ok(_verified_light_client_optimistic_update) => {
match self.chain.verify_optimistic_update_for_gossip(
light_client_optimistic_update.clone(),
seen_timestamp,
) {
Ok(verified_light_client_optimistic_update) => {
debug!(
self.log,
"Light client successful optimistic update";
"peer" => %peer_id,
"parent_root" => %verified_light_client_optimistic_update.parent_root,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
}
Err(e) => {
metrics::register_optimistic_update_error(&e);
match e {
LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => {
LightClientOptimisticUpdateError::UnknownBlockParentRoot(parent_root) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES,
);
debug!(
self.log,
"LC invalid optimistic update";
"Optimistic update for unknown block";
"peer_id" => %peer_id,
"parent_root" => ?parent_root
);
if let Some(sender) = reprocess_tx {
let msg = ReprocessQueueMessage::UnknownLightClientOptimisticUpdate(
QueuedLightClientUpdate {
peer_id,
message_id,
light_client_optimistic_update: Box::new(
light_client_optimistic_update,
),
parent_root,
seen_timestamp,
},
);
if sender.try_send(msg).is_err() {
error!(
self.log,
"Failed to send optimistic update for re-processing";
)
}
} else {
debug!(
self.log,
"Not sending light client update because it had been reprocessed";
"peer_id" => %peer_id,
"parent_root" => ?parent_root
);
self.propagate_validation_result(
message_id,
peer_id,
MessageAcceptance::Ignore,
);
}
return;
}
LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => {
metrics::register_optimistic_update_error(&e);
debug!(
self.log,
"Light client invalid optimistic update";
"peer" => %peer_id,
"error" => ?e,
);
@ -1488,9 +1551,10 @@ impl<T: BeaconChainTypes> Worker<T> {
)
}
LightClientOptimisticUpdateError::TooEarly => {
metrics::register_optimistic_update_error(&e);
debug!(
self.log,
"LC optimistic update too early";
"Light client optimistic update too early";
"peer" => %peer_id,
"error" => ?e,
);
@ -1501,21 +1565,29 @@ impl<T: BeaconChainTypes> Worker<T> {
"light_client_gossip_error",
);
}
LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => debug!(
LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => {
metrics::register_optimistic_update_error(&e);
debug!(
self.log,
"LC optimistic update already seen";
"Light client optimistic update already seen";
"peer" => %peer_id,
"error" => ?e,
),
)
}
LightClientOptimisticUpdateError::BeaconChainError(_)
| LightClientOptimisticUpdateError::LightClientUpdateError(_)
| LightClientOptimisticUpdateError::SigSlotStartIsNone
| LightClientOptimisticUpdateError::FailedConstructingUpdate => debug!(
| LightClientOptimisticUpdateError::FailedConstructingUpdate => {
metrics::register_optimistic_update_error(&e);
debug!(
self.log,
"LC error constructing optimistic update";
"Light client error constructing optimistic update";
"peer" => %peer_id,
"error" => ?e,
),
)
}
}
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}

View File

@ -85,6 +85,7 @@ impl<T: BeaconChainTypes> Worker<T> {
}
};
let slot = block.slot();
let parent_root = block.message().parent_root();
let available_block = block
.into_available_block(block_root, &self.chain)
.map_err(BlockError::BlobValidation);
@ -110,7 +111,10 @@ impl<T: BeaconChainTypes> Worker<T> {
info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash);
// Trigger processing for work referencing this block.
let reprocess_msg = ReprocessQueueMessage::BlockImported(hash);
let reprocess_msg = ReprocessQueueMessage::BlockImported {
block_root: hash,
parent_root,
};
if reprocess_tx.try_send(reprocess_msg).is_err() {
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash)
};

View File

@ -357,10 +357,18 @@ lazy_static! {
pub static ref BEACON_BLOCK_GOSSIP_SLOT_START_DELAY_TIME: Result<Histogram> = try_create_histogram_with_buckets(
"beacon_block_gossip_slot_start_delay_time",
"Duration between when the block is received and the start of the slot it belongs to.",
// Create a custom bucket list for greater granularity in block delay
Ok(vec![0.1, 0.2, 0.3,0.4,0.5,0.75,1.0,1.25,1.5,1.75,2.0,2.5,3.0,3.5,4.0,5.0,6.0,7.0,8.0,9.0,10.0,15.0,20.0])
// NOTE: Previous values, which we may want to switch back to.
// [0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50]
decimal_buckets(-1,2)
//decimal_buckets(-1,2)
);
pub static ref BEACON_BLOCK_LAST_DELAY: Result<IntGauge> = try_create_int_gauge(
"beacon_block_last_delay",
"Keeps track of the last block's delay from the start of the slot"
);
pub static ref BEACON_BLOCK_GOSSIP_ARRIVED_LATE_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_block_gossip_arrived_late_total",
"Count of times when a gossip block arrived from the network later than the attestation deadline.",
@ -384,6 +392,21 @@ lazy_static! {
"Number of queued attestations where as matching block has been imported."
);
/*
* Light client update reprocessing queue metrics.
*/
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES: Result<IntCounter> = try_create_int_counter(
"beacon_processor_reprocessing_queue_expired_optimistic_updates",
"Number of queued light client optimistic updates which have expired before a matching block has been found."
);
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES: Result<IntCounter> = try_create_int_counter(
"beacon_processor_reprocessing_queue_matched_optimistic_updates",
"Number of queued light client optimistic updates where as matching block has been imported."
);
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES: Result<IntCounter> = try_create_int_counter(
"beacon_processor_reprocessing_queue_sent_optimistic_updates",
"Number of queued light client optimistic updates where as matching block has been imported."
);
}
pub fn update_bandwidth_metrics(bandwidth: Arc<BandwidthSinks>) {

View File

@ -0,0 +1,105 @@
use state_processing::SigVerifiedOp;
use std::collections::{hash_map::Entry, HashMap};
use std::sync::Arc;
use types::{
AbstractExecPayload, BeaconState, ChainSpec, EthSpec, SignedBeaconBlock,
SignedBlsToExecutionChange,
};
/// Pool of BLS to execution changes that maintains a LIFO queue and an index by validator.
///
/// Using the LIFO queue for block production disincentivises spam on P2P at the Capella fork,
/// and is less-relevant after that.
#[derive(Debug, Default)]
pub struct BlsToExecutionChanges<T: EthSpec> {
/// Map from validator index to BLS to execution change.
by_validator_index: HashMap<u64, Arc<SigVerifiedOp<SignedBlsToExecutionChange, T>>>,
/// Last-in-first-out (LIFO) queue of verified messages.
queue: Vec<Arc<SigVerifiedOp<SignedBlsToExecutionChange, T>>>,
}
impl<T: EthSpec> BlsToExecutionChanges<T> {
pub fn existing_change_equals(
&self,
address_change: &SignedBlsToExecutionChange,
) -> Option<bool> {
self.by_validator_index
.get(&address_change.message.validator_index)
.map(|existing| existing.as_inner() == address_change)
}
pub fn insert(
&mut self,
verified_change: SigVerifiedOp<SignedBlsToExecutionChange, T>,
) -> bool {
// Wrap in an `Arc` once on insert.
let verified_change = Arc::new(verified_change);
match self
.by_validator_index
.entry(verified_change.as_inner().message.validator_index)
{
Entry::Vacant(entry) => {
self.queue.push(verified_change.clone());
entry.insert(verified_change);
true
}
Entry::Occupied(_) => false,
}
}
/// FIFO ordering, used for persistence to disk.
pub fn iter_fifo(
&self,
) -> impl Iterator<Item = &Arc<SigVerifiedOp<SignedBlsToExecutionChange, T>>> {
self.queue.iter()
}
/// LIFO ordering, used for block packing.
pub fn iter_lifo(
&self,
) -> impl Iterator<Item = &Arc<SigVerifiedOp<SignedBlsToExecutionChange, T>>> {
self.queue.iter().rev()
}
/// Prune BLS to execution changes that have been applied to the state more than 1 block ago.
///
/// The block check is necessary to avoid pruning too eagerly and losing the ability to include
/// address changes during re-orgs. This is isn't *perfect* so some address changes could
/// still get stuck if there are gnarly re-orgs and the changes can't be widely republished
/// due to the gossip duplicate rules.
pub fn prune<Payload: AbstractExecPayload<T>>(
&mut self,
head_block: &SignedBeaconBlock<T, Payload>,
head_state: &BeaconState<T>,
spec: &ChainSpec,
) {
let mut validator_indices_pruned = vec![];
self.queue.retain(|address_change| {
let validator_index = address_change.as_inner().message.validator_index;
head_state
.validators()
.get(validator_index as usize)
.map_or(true, |validator| {
let prune = validator.has_eth1_withdrawal_credential(spec)
&& head_block
.message()
.body()
.bls_to_execution_changes()
.map_or(true, |recent_changes| {
!recent_changes
.iter()
.any(|c| c.message.validator_index == validator_index)
});
if prune {
validator_indices_pruned.push(validator_index);
}
!prune
})
});
for validator_index in validator_indices_pruned {
self.by_validator_index.remove(&validator_index);
}
}
}

View File

@ -2,6 +2,7 @@ mod attestation;
mod attestation_id;
mod attestation_storage;
mod attester_slashing;
mod bls_to_execution_changes;
mod max_cover;
mod metrics;
mod persistence;
@ -18,6 +19,7 @@ pub use persistence::{
pub use reward_cache::RewardCache;
use crate::attestation_storage::{AttestationMap, CheckpointKey};
use crate::bls_to_execution_changes::BlsToExecutionChanges;
use crate::sync_aggregate_id::SyncAggregateId;
use attester_slashing::AttesterSlashingMaxCover;
use max_cover::maximum_cover;
@ -51,8 +53,8 @@ pub struct OperationPool<T: EthSpec + Default> {
proposer_slashings: RwLock<HashMap<u64, SigVerifiedOp<ProposerSlashing, T>>>,
/// Map from exiting validator to their exit data.
voluntary_exits: RwLock<HashMap<u64, SigVerifiedOp<SignedVoluntaryExit, T>>>,
/// Map from credential changing validator to their execution change data.
bls_to_execution_changes: RwLock<HashMap<u64, SigVerifiedOp<SignedBlsToExecutionChange, T>>>,
/// Map from credential changing validator to their position in the queue.
bls_to_execution_changes: RwLock<BlsToExecutionChanges<T>>,
/// Reward cache for accelerating attestation packing.
reward_cache: RwLock<RewardCache>,
_phantom: PhantomData<T>,
@ -513,15 +515,28 @@ impl<T: EthSpec> OperationPool<T> {
);
}
/// Insert a BLS to execution change into the pool.
/// Check if an address change equal to `address_change` is already in the pool.
///
/// Return `None` if no address change for the validator index exists in the pool.
pub fn bls_to_execution_change_in_pool_equals(
&self,
address_change: &SignedBlsToExecutionChange,
) -> Option<bool> {
self.bls_to_execution_changes
.read()
.existing_change_equals(address_change)
}
/// Insert a BLS to execution change into the pool, *only if* no prior change is known.
///
/// Return `true` if the change was inserted.
pub fn insert_bls_to_execution_change(
&self,
verified_change: SigVerifiedOp<SignedBlsToExecutionChange, T>,
) {
self.bls_to_execution_changes.write().insert(
verified_change.as_inner().message.validator_index,
verified_change,
);
) -> bool {
self.bls_to_execution_changes
.write()
.insert(verified_change)
}
/// Get a list of execution changes for inclusion in a block.
@ -533,7 +548,7 @@ impl<T: EthSpec> OperationPool<T> {
spec: &ChainSpec,
) -> Vec<SignedBlsToExecutionChange> {
filter_limit_operations(
self.bls_to_execution_changes.read().values(),
self.bls_to_execution_changes.read().iter_lifo(),
|address_change| {
address_change.signature_is_still_valid(&state.fork())
&& state
@ -548,33 +563,15 @@ impl<T: EthSpec> OperationPool<T> {
}
/// Prune BLS to execution changes that have been applied to the state more than 1 block ago.
///
/// The block check is necessary to avoid pruning too eagerly and losing the ability to include
/// address changes during re-orgs. This is isn't *perfect* so some address changes could
/// still get stuck if there are gnarly re-orgs and the changes can't be widely republished
/// due to the gossip duplicate rules.
pub fn prune_bls_to_execution_changes<Payload: AbstractExecPayload<T>>(
&self,
head_block: &SignedBeaconBlock<T, Payload>,
head_state: &BeaconState<T>,
spec: &ChainSpec,
) {
prune_validator_hash_map(
&mut self.bls_to_execution_changes.write(),
|validator_index, validator| {
validator.has_eth1_withdrawal_credential(spec)
&& head_block
.message()
.body()
.bls_to_execution_changes()
.map_or(true, |recent_changes| {
!recent_changes
.iter()
.any(|c| c.message.validator_index == validator_index)
})
},
head_state,
);
self.bls_to_execution_changes
.write()
.prune(head_block, head_state, spec)
}
/// Prune all types of transactions given the latest head state and head fork.
@ -663,8 +660,8 @@ impl<T: EthSpec> OperationPool<T> {
pub fn get_all_bls_to_execution_changes(&self) -> Vec<SignedBlsToExecutionChange> {
self.bls_to_execution_changes
.read()
.iter()
.map(|(_, address_change)| address_change.as_inner().clone())
.iter_fifo()
.map(|address_change| address_change.as_inner().clone())
.collect()
}
}

View File

@ -1,5 +1,6 @@
use crate::attestation_id::AttestationId;
use crate::attestation_storage::AttestationMap;
use crate::bls_to_execution_changes::BlsToExecutionChanges;
use crate::sync_aggregate_id::SyncAggregateId;
use crate::OpPoolError;
use crate::OperationPool;
@ -105,8 +106,8 @@ impl<T: EthSpec> PersistedOperationPool<T> {
let bls_to_execution_changes = operation_pool
.bls_to_execution_changes
.read()
.iter()
.map(|(_, bls_to_execution_change)| bls_to_execution_change.clone())
.iter_fifo()
.map(|bls_to_execution_change| (**bls_to_execution_change).clone())
.collect();
PersistedOperationPool::V14(PersistedOperationPoolV14 {
@ -153,18 +154,13 @@ impl<T: EthSpec> PersistedOperationPool<T> {
PersistedOperationPool::V5(_) | PersistedOperationPool::V12(_) => {
return Err(OpPoolError::IncorrectOpPoolVariant)
}
PersistedOperationPool::V14(pool) => RwLock::new(
pool.bls_to_execution_changes
.iter()
.cloned()
.map(|bls_to_execution_change| {
(
bls_to_execution_change.as_inner().message.validator_index,
bls_to_execution_change,
)
})
.collect(),
),
PersistedOperationPool::V14(pool) => {
let mut bls_to_execution_changes = BlsToExecutionChanges::default();
for bls_to_execution_change in pool.bls_to_execution_changes {
bls_to_execution_changes.insert(bls_to_execution_change);
}
RwLock::new(bls_to_execution_changes)
}
};
let op_pool = OperationPool {
attestations,

View File

@ -48,17 +48,6 @@ The Ethereum community provides various [public endpoints](https://eth-clients.g
lighthouse bn --checkpoint-sync-url https://example.com/ ...
```
### Use Infura as a remote beacon node provider
You can use Infura as the remote beacon node provider to load the initial checkpoint state.
1. Sign up for the free Infura ETH2 API using the `Create new project tab` on the [Infura dashboard](https://infura.io/dashboard).
2. Copy the HTTPS endpoint for the required network (Mainnet/Prater).
3. Use it as the url for the `--checkpoint-sync-url` flag. e.g.
```
lighthouse bn --checkpoint-sync-url https://<PROJECT-ID>:<PROJECT-SECRET>@eth2-beacon-mainnet.infura.io ...
```
## Backfilling Blocks
Once forwards sync completes, Lighthouse will commence a "backfill sync" to download the blocks

View File

@ -64,6 +64,7 @@ choco install protoc
These dependencies are for compiling Lighthouse natively on Windows. Lighthouse can also run
successfully under the [Windows Subsystem for Linux (WSL)][WSL]. If using Ubuntu under WSL, you
should follow the instructions for Ubuntu listed in the [Dependencies (Ubuntu)](#ubuntu) section.
[WSL]: https://docs.microsoft.com/en-us/windows/wsl/about
## Build Lighthouse
@ -128,8 +129,12 @@ Commonly used features include:
* `gnosis`: support for the Gnosis Beacon Chain.
* `portable`: support for legacy hardware.
* `modern`: support for exclusively modern hardware.
* `slasher-mdbx`: support for the MDBX slasher backend (enabled by default).
* `slasher-mdbx`: support for the MDBX slasher backend. Enabled by default.
* `slasher-lmdb`: support for the LMDB slasher backend.
* `jemalloc`: use [`jemalloc`][jemalloc] to allocate memory. Enabled by default on Linux and macOS.
Not supported on Windows.
[jemalloc]: https://jemalloc.net/
## Compilation Profiles

View File

@ -58,7 +58,7 @@ supported.
Each execution engine has its own flags for configuring the engine API and JWT. Please consult
the relevant page for your execution engine for the required flags:
- [Geth: Connecting to Consensus Clients](https://geth.ethereum.org/docs/interface/consensus-clients)
- [Geth: Connecting to Consensus Clients](https://geth.ethereum.org/docs/getting-started/consensus-clients)
- [Nethermind: Running Nethermind Post Merge](https://docs.nethermind.io/nethermind/first-steps-with-nethermind/running-nethermind-post-merge)
- [Besu: Prepare For The Merge](https://besu.hyperledger.org/en/stable/HowTo/Upgrade/Prepare-for-The-Merge/)
- [Erigon: Beacon Chain (Consensus Layer)](https://github.com/ledgerwatch/erigon#beacon-chain-consensus-layer)
@ -203,5 +203,5 @@ guidance for specific setups.
- [Ethereum.org: The Merge](https://ethereum.org/en/upgrades/merge/)
- [Ethereum Staking Launchpad: Merge Readiness](https://launchpad.ethereum.org/en/merge-readiness).
- [CoinCashew: Ethereum Merge Upgrade Checklist](https://www.coincashew.com/coins/overview-eth/ethereum-merge-upgrade-checklist-for-home-stakers-and-validators)
- [EthDocker: Merge Preparation](https://eth-docker.net/docs/About/MergePrep/)
- [EthDocker: Merge Preparation](https://eth-docker.net/About/MergePrep/)
- [Remy Roy: How to join the Goerli/Prater merge testnet](https://github.com/remyroy/ethstaker/blob/main/merge-goerli-prater.md)

View File

@ -26,7 +26,7 @@ has authority to control the execution engine.
Each execution engine has its own flags for configuring the engine API and JWT.
Please consult the relevant page of your execution engine for the required flags:
- [Geth: Connecting to Consensus Clients](https://geth.ethereum.org/docs/interface/consensus-clients)
- [Geth: Connecting to Consensus Clients](https://geth.ethereum.org/docs/getting-started/consensus-clients)
- [Nethermind: Running Nethermind & CL](https://docs.nethermind.io/nethermind/first-steps-with-nethermind/running-nethermind-post-merge)
- [Besu: Connect to Mainnet](https://besu.hyperledger.org/en/stable/public-networks/get-started/connect/mainnet/)
- [Erigon: Beacon Chain (Consensus Layer)](https://github.com/ledgerwatch/erigon#beacon-chain-consensus-layer)

View File

@ -10,7 +10,6 @@ status = [
"merge-transition-ubuntu",
"no-eth1-simulator-ubuntu",
"check-benchmarks",
"check-consensus",
"clippy",
"arbitrary-check",
"cargo-audit",

View File

@ -1042,6 +1042,24 @@ impl BeaconNodeHttpClient {
Ok(())
}
/// `POST beacon/pool/bls_to_execution_changes`
pub async fn post_beacon_pool_bls_to_execution_changes(
&self,
address_changes: &[SignedBlsToExecutionChange],
) -> Result<(), Error> {
let mut path = self.eth_path(V1)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("pool")
.push("bls_to_execution_changes");
self.post(path, &address_changes).await?;
Ok(())
}
/// `GET beacon/deposit_snapshot`
pub async fn get_deposit_snapshot(&self) -> Result<Option<types::DepositTreeSnapshot>, Error> {
use ssz::Decode;
@ -1056,6 +1074,24 @@ impl BeaconNodeHttpClient {
.transpose()
}
/// `POST beacon/rewards/sync_committee`
pub async fn post_beacon_rewards_sync_committee(
&self,
rewards: &[Option<Vec<lighthouse::SyncCommitteeReward>>],
) -> Result<(), Error> {
let mut path = self.eth_path(V1)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("rewards")
.push("sync_committee");
self.post(path, &rewards).await?;
Ok(())
}
/// `POST validator/contribution_and_proofs`
pub async fn post_validator_contribution_and_proofs<T: EthSpec>(
&self,

View File

@ -3,6 +3,7 @@
mod attestation_performance;
mod block_packing_efficiency;
mod block_rewards;
mod sync_committee_rewards;
use crate::{
ok_or_error,
@ -27,6 +28,7 @@ pub use block_packing_efficiency::{
};
pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery};
pub use lighthouse_network::{types::SyncState, PeerInfo};
pub use sync_committee_rewards::SyncCommitteeReward;
// Define "legacy" implementations of `Option<T>` which use four bytes for encoding the union
// selector.

View File

@ -0,0 +1,12 @@
use serde::{Deserialize, Serialize};
// Details about the rewards paid to sync committee members for attesting headers
// All rewards in GWei
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct SyncCommitteeReward {
#[serde(with = "eth2_serde_utils::quoted_u64")]
pub validator_index: u64,
// sync committee reward in gwei for the validator
pub reward: i64,
}

View File

@ -4,13 +4,21 @@ version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
lighthouse_metrics = { path = "../lighthouse_metrics" }
lazy_static = "1.4.0"
libc = "0.2.79"
parking_lot = "0.12.0"
jemalloc-ctl = { version = "0.5.0", optional = true }
# Jemalloc's background_threads feature requires Linux (pthreads).
[target.'cfg(target_os = "linux")'.dependencies]
jemallocator = { version = "0.5.0", optional = true, features = ["stats", "background_threads"] }
[target.'cfg(not(target_os = "linux"))'.dependencies]
jemallocator = { version = "0.5.0", optional = true, features = ["stats"] }
[features]
mallinfo2 = []
jemalloc = ["jemallocator", "jemalloc-ctl"]
jemalloc-profiling = ["jemallocator/profiling"]

View File

@ -0,0 +1,52 @@
//! Set the allocator to `jemalloc`.
//!
//! Due to `jemalloc` requiring configuration at compile time or immediately upon runtime
//! initialisation it is configured via a Cargo config file in `.cargo/config.toml`.
//!
//! The `jemalloc` tuning can be overriden by:
//!
//! A) `JEMALLOC_SYS_WITH_MALLOC_CONF` at compile-time.
//! B) `_RJEM_MALLOC_CONF` at runtime.
use jemalloc_ctl::{arenas, epoch, stats, Error};
use lazy_static::lazy_static;
use lighthouse_metrics::{set_gauge, try_create_int_gauge, IntGauge};
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
// Metrics for jemalloc.
lazy_static! {
pub static ref NUM_ARENAS: lighthouse_metrics::Result<IntGauge> =
try_create_int_gauge("jemalloc_num_arenas", "The number of arenas in use");
pub static ref BYTES_ALLOCATED: lighthouse_metrics::Result<IntGauge> =
try_create_int_gauge("jemalloc_bytes_allocated", "Equivalent to stats.allocated");
pub static ref BYTES_ACTIVE: lighthouse_metrics::Result<IntGauge> =
try_create_int_gauge("jemalloc_bytes_active", "Equivalent to stats.active");
pub static ref BYTES_MAPPED: lighthouse_metrics::Result<IntGauge> =
try_create_int_gauge("jemalloc_bytes_mapped", "Equivalent to stats.mapped");
pub static ref BYTES_METADATA: lighthouse_metrics::Result<IntGauge> =
try_create_int_gauge("jemalloc_bytes_metadata", "Equivalent to stats.metadata");
pub static ref BYTES_RESIDENT: lighthouse_metrics::Result<IntGauge> =
try_create_int_gauge("jemalloc_bytes_resident", "Equivalent to stats.resident");
pub static ref BYTES_RETAINED: lighthouse_metrics::Result<IntGauge> =
try_create_int_gauge("jemalloc_bytes_retained", "Equivalent to stats.retained");
}
pub fn scrape_jemalloc_metrics() {
scrape_jemalloc_metrics_fallible().unwrap()
}
pub fn scrape_jemalloc_metrics_fallible() -> Result<(), Error> {
// Advance the epoch so that the underlying statistics are updated.
epoch::advance()?;
set_gauge(&NUM_ARENAS, arenas::narenas::read()? as i64);
set_gauge(&BYTES_ALLOCATED, stats::allocated::read()? as i64);
set_gauge(&BYTES_ACTIVE, stats::active::read()? as i64);
set_gauge(&BYTES_MAPPED, stats::mapped::read()? as i64);
set_gauge(&BYTES_METADATA, stats::metadata::read()? as i64);
set_gauge(&BYTES_RESIDENT, stats::resident::read()? as i64);
set_gauge(&BYTES_RETAINED, stats::retained::read()? as i64);
Ok(())
}

View File

@ -2,18 +2,18 @@
//!
//! ## Conditional Compilation
//!
//! Presently, only configuration for "The GNU Allocator" from `glibc` is supported. All other
//! allocators are ignored.
//! This crate can be compiled with different feature flags to support different allocators:
//!
//! It is assumed that if the following two statements are correct then we should expect to
//! configure `glibc`:
//! - Jemalloc, via the `jemalloc` feature.
//! - GNU malloc, if no features are set and the system supports it.
//! - The system allocator, if no features are set and the allocator is not GNU malloc.
//!
//! It is assumed that if Jemalloc is not in use, and the following two statements are correct then
//! we should expect to configure `glibc`:
//!
//! - `target_os = linux`
//! - `target_env != musl`
//!
//! In all other cases this library will not attempt to do anything (i.e., all functions are
//! no-ops).
//!
//! If the above conditions are fulfilled but `glibc` still isn't present at runtime then a panic
//! may be triggered. It is understood that there's no way to be certain that a compatible `glibc`
//! is present: https://github.com/rust-lang/rust/issues/33244.
@ -24,18 +24,42 @@
//! detecting `glibc` are best-effort. If this crate throws errors about undefined external
//! functions, then try to compile with the `not_glibc_interface` module.
#[cfg(all(target_os = "linux", not(target_env = "musl")))]
#[cfg(all(
target_os = "linux",
not(target_env = "musl"),
not(feature = "jemalloc")
))]
mod glibc;
#[cfg(feature = "jemalloc")]
mod jemalloc;
pub use interface::*;
#[cfg(all(target_os = "linux", not(target_env = "musl")))]
#[cfg(all(
target_os = "linux",
not(target_env = "musl"),
not(feature = "jemalloc")
))]
mod interface {
pub use crate::glibc::configure_glibc_malloc as configure_memory_allocator;
pub use crate::glibc::scrape_mallinfo_metrics as scrape_allocator_metrics;
}
#[cfg(any(not(target_os = "linux"), target_env = "musl"))]
#[cfg(feature = "jemalloc")]
mod interface {
#[allow(dead_code)]
pub fn configure_memory_allocator() -> Result<(), String> {
Ok(())
}
pub use crate::jemalloc::scrape_jemalloc_metrics as scrape_allocator_metrics;
}
#[cfg(all(
any(not(target_os = "linux"), target_env = "musl"),
not(feature = "jemalloc")
))]
mod interface {
#[allow(dead_code, clippy::unnecessary_wraps)]
pub fn configure_memory_allocator() -> Result<(), String> {

View File

@ -67,7 +67,7 @@ where
fn new(op: T, state: &BeaconState<E>) -> Self {
let verified_against = VerifiedAgainst {
fork_versions: op
.verification_epochs(state.current_epoch())
.verification_epochs()
.into_iter()
.map(|epoch| state.fork().get_fork_version(epoch))
.collect(),
@ -89,13 +89,9 @@ where
}
pub fn signature_is_still_valid(&self, current_fork: &Fork) -> bool {
// Pass the fork's epoch as the effective current epoch. If the message is a current-epoch
// style message like `SignedBlsToExecutionChange` then `get_fork_version` will return the
// current fork version and we'll check it matches the fork version the message was checked
// against.
let effective_current_epoch = current_fork.epoch;
// The .all() will return true if the iterator is empty.
self.as_inner()
.verification_epochs(effective_current_epoch)
.verification_epochs()
.into_iter()
.zip(self.verified_against.fork_versions.iter())
.all(|(epoch, verified_fork_version)| {
@ -126,12 +122,8 @@ pub trait VerifyOperation<E: EthSpec>: Encode + Decode + Sized {
///
/// These need to map 1-to-1 to the `SigVerifiedOp::verified_against` for this type.
///
/// If the message contains no inherent epoch it should return the `current_epoch` that is
/// passed in, as that's the epoch at which it was verified.
fn verification_epochs(
&self,
current_epoch: Epoch,
) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]>;
/// If the message is valid across all forks it should return an empty smallvec.
fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]>;
}
impl<E: EthSpec> VerifyOperation<E> for SignedVoluntaryExit {
@ -147,7 +139,7 @@ impl<E: EthSpec> VerifyOperation<E> for SignedVoluntaryExit {
}
#[allow(clippy::integer_arithmetic)]
fn verification_epochs(&self, _: Epoch) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
smallvec![self.message.epoch]
}
}
@ -165,7 +157,7 @@ impl<E: EthSpec> VerifyOperation<E> for AttesterSlashing<E> {
}
#[allow(clippy::integer_arithmetic)]
fn verification_epochs(&self, _: Epoch) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
smallvec![
self.attestation_1.data.target.epoch,
self.attestation_2.data.target.epoch
@ -186,7 +178,7 @@ impl<E: EthSpec> VerifyOperation<E> for ProposerSlashing {
}
#[allow(clippy::integer_arithmetic)]
fn verification_epochs(&self, _: Epoch) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
// Only need a single epoch because the slots of the two headers must be equal.
smallvec![self
.signed_header_1
@ -209,10 +201,7 @@ impl<E: EthSpec> VerifyOperation<E> for SignedBlsToExecutionChange {
}
#[allow(clippy::integer_arithmetic)]
fn verification_epochs(
&self,
current_epoch: Epoch,
) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
smallvec![current_epoch]
fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
smallvec![]
}
}

View File

@ -2,7 +2,7 @@
use crate::test_utils::*;
use crate::test_utils::{SeedableRng, XorShiftRng};
use beacon_chain::test_utils::{
interop_genesis_state, test_spec, BeaconChainHarness, EphemeralHarnessType,
interop_genesis_state_with_eth1, test_spec, BeaconChainHarness, EphemeralHarnessType,
DEFAULT_ETH1_BLOCK_HASH,
};
use beacon_chain::types::{
@ -551,7 +551,7 @@ fn tree_hash_cache_linear_history_long_skip() {
let spec = &test_spec::<MinimalEthSpec>();
// This state has a cache that advances normally each slot.
let mut state: BeaconState<MinimalEthSpec> = interop_genesis_state(
let mut state: BeaconState<MinimalEthSpec> = interop_genesis_state_with_eth1(
&keypairs,
0,
Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH),

View File

@ -28,6 +28,26 @@ pub struct BlsToExecutionChange {
impl SignedRoot for BlsToExecutionChange {}
impl BlsToExecutionChange {
pub fn sign(
self,
secret_key: &SecretKey,
genesis_validators_root: Hash256,
spec: &ChainSpec,
) -> SignedBlsToExecutionChange {
let domain = spec.compute_domain(
Domain::BlsToExecutionChange,
spec.genesis_fork_version,
genesis_validators_root,
);
let message = self.signing_root(domain);
SignedBlsToExecutionChange {
message: self,
signature: secret_key.sign(message),
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -8,6 +8,7 @@ edition = "2021"
[features]
portable = ["bls/supranational-portable"]
fake_crypto = ['bls/fake_crypto']
jemalloc = ["malloc_utils/jemalloc"]
[dependencies]
bls = { path = "../crypto/bls" }
@ -42,3 +43,7 @@ eth2 = { path = "../common/eth2" }
snap = "1.0.1"
beacon_chain = { path = "../beacon_node/beacon_chain" }
store = { path = "../beacon_node/store" }
malloc_utils = { path = "../common/malloc_utils" }
[package.metadata.cargo-udeps.ignore]
normal = ["malloc_utils"]

View File

@ -827,6 +827,7 @@ fn run<T: EthSpec>(
debug_level: String::from("trace"),
logfile_debug_level: String::from("trace"),
log_format: None,
logfile_format: None,
log_color: false,
disable_log_timestamp: false,
max_log_size: 0,

View File

@ -24,6 +24,8 @@ gnosis = []
slasher-mdbx = ["slasher/mdbx"]
# Support slasher LMDB backend.
slasher-lmdb = ["slasher/lmdb"]
# Use jemalloc.
jemalloc = ["malloc_utils/jemalloc"]
[dependencies]
beacon_node = { "path" = "../beacon_node" }

View File

@ -50,6 +50,7 @@ pub struct LoggerConfig {
pub debug_level: String,
pub logfile_debug_level: String,
pub log_format: Option<String>,
pub logfile_format: Option<String>,
pub log_color: bool,
pub disable_log_timestamp: bool,
pub max_log_size: u64,
@ -64,6 +65,7 @@ impl Default for LoggerConfig {
debug_level: String::from("info"),
logfile_debug_level: String::from("debug"),
log_format: None,
logfile_format: None,
log_color: false,
disable_log_timestamp: false,
max_log_size: 200,
@ -252,7 +254,7 @@ impl<E: EthSpec> EnvironmentBuilder<E> {
let file_logger = FileLoggerBuilder::new(&path)
.level(logfile_level)
.channel_size(LOG_CHANNEL_SIZE)
.format(match config.log_format.as_deref() {
.format(match config.logfile_format.as_deref() {
Some("JSON") => Format::Json,
_ => Format::default(),
})

View File

@ -31,6 +31,14 @@ fn bls_library_name() -> &'static str {
}
}
fn allocator_name() -> &'static str {
if cfg!(feature = "jemalloc") {
"jemalloc"
} else {
"system"
}
}
fn main() {
// Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided.
if std::env::var("RUST_BACKTRACE").is_err() {
@ -51,10 +59,12 @@ fn main() {
"{}\n\
BLS library: {}\n\
SHA256 hardware acceleration: {}\n\
Allocator: {}\n\
Specs: mainnet (true), minimal ({}), gnosis ({})",
VERSION.replace("Lighthouse/", ""),
bls_library_name(),
have_sha_extensions(),
allocator_name(),
cfg!(feature = "spec-minimal"),
cfg!(feature = "gnosis"),
).as_str()
@ -99,6 +109,15 @@ fn main() {
.default_value("debug")
.global(true),
)
.arg(
Arg::with_name("logfile-format")
.long("logfile-format")
.value_name("FORMAT")
.help("Specifies the log format used when emitting logs to the logfile.")
.possible_values(&["DEFAULT", "JSON"])
.takes_value(true)
.global(true)
)
.arg(
Arg::with_name("logfile-max-size")
.long("logfile-max-size")
@ -402,6 +421,11 @@ fn run<E: EthSpec>(
.value_of("logfile-debug-level")
.ok_or("Expected --logfile-debug-level flag")?;
let logfile_format = matches
.value_of("logfile-format")
// Ensure that `logfile-format` defaults to the value of `log-format`.
.or_else(|| matches.value_of("log-format"));
let logfile_max_size: u64 = matches
.value_of("logfile-max-size")
.ok_or("Expected --logfile-max-size flag")?
@ -452,6 +476,7 @@ fn run<E: EthSpec>(
debug_level: String::from(debug_level),
logfile_debug_level: String::from(logfile_debug_level),
log_format: log_format.map(String::from),
logfile_format: logfile_format.map(String::from),
log_color,
disable_log_timestamp,
max_log_size: logfile_max_size * 1_024 * 1_024,

View File

@ -1662,7 +1662,24 @@ fn logfile_no_restricted_perms_flag() {
assert!(config.logger_config.is_restricted == false);
});
}
#[test]
fn logfile_format_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert_eq!(config.logger_config.logfile_format, None));
}
#[test]
fn logfile_format_flag() {
CommandLineTest::new()
.flag("logfile-format", Some("JSON"))
.run_with_zero_port()
.with_config(|config| {
assert_eq!(
config.logger_config.logfile_format,
Some("JSON".to_string())
)
});
}
#[test]
fn sync_eth1_chain_default() {
CommandLineTest::new()

View File

@ -1,11 +1,9 @@
FROM rust:1.62.1-bullseye AS builder
RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake clang libclang-dev
FROM rust:1.66.1-bullseye AS builder
RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev clang protobuf-compiler
COPY . lighthouse
# Build lighthouse directly with a cargo build command, bypassing the Makefile.
# We have to use nightly in order to disable the new LLVM pass manager.
RUN rustup default nightly-2022-07-26 && cd lighthouse && LD_LIBRARY_PATH=/lighthouse/testing/antithesis/libvoidstar/ RUSTFLAGS="-Znew-llvm-pass-manager=no -Cpasses=sancov -Cllvm-args=-sanitizer-coverage-level=3 -Cllvm-args=-sanitizer-coverage-trace-pc-guard -Ccodegen-units=1 -Cdebuginfo=2 -L/lighthouse/testing/antithesis/libvoidstar/ -lvoidstar" cargo build --release --manifest-path lighthouse/Cargo.toml --target x86_64-unknown-linux-gnu --features modern --verbose --bin lighthouse
RUN cd lighthouse && LD_LIBRARY_PATH=/lighthouse/testing/antithesis/libvoidstar/ RUSTFLAGS="-Cpasses=sancov-module -Cllvm-args=-sanitizer-coverage-level=3 -Cllvm-args=-sanitizer-coverage-trace-pc-guard -Ccodegen-units=1 -Cdebuginfo=2 -L/lighthouse/testing/antithesis/libvoidstar/ -lvoidstar" cargo build --release --manifest-path lighthouse/Cargo.toml --target x86_64-unknown-linux-gnu --features modern --verbose --bin lighthouse
# build lcli binary directly with cargo install command, bypassing the makefile
RUN cargo install --path /lighthouse/lcli --force --locked

View File

@ -62,6 +62,7 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
debug_level: String::from("debug"),
logfile_debug_level: String::from("debug"),
log_format: None,
logfile_format: None,
log_color: false,
disable_log_timestamp: false,
max_log_size: 0,

View File

@ -47,6 +47,7 @@ pub fn run_no_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
debug_level: String::from("debug"),
logfile_debug_level: String::from("debug"),
log_format: None,
logfile_format: None,
log_color: false,
disable_log_timestamp: false,
max_log_size: 0,

View File

@ -51,6 +51,7 @@ fn syncing_sim(
debug_level: String::from(log_level),
logfile_debug_level: String::from("debug"),
log_format: log_format.map(String::from),
logfile_format: None,
log_color: false,
disable_log_timestamp: false,
max_log_size: 0,

View File

@ -335,6 +335,11 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let proposer_index = self.validator_store.validator_index(&validator_pubkey);
let validator_pubkey_ref = &validator_pubkey;
info!(
log,
"Requesting unsigned block";
"slot" => slot.as_u64(),
);
// Request block from first responsive beacon node.
let block = self
.beacon_nodes
@ -385,6 +390,11 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
}
};
info!(
log,
"Received unsigned block";
"slot" => slot.as_u64(),
);
if proposer_index != Some(block.proposer_index()) {
return Err(BlockError::Recoverable(
"Proposer index does not match block proposer. Beacon chain re-orged"
@ -403,6 +413,11 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
.await
.map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?;
info!(
log,
"Publishing signed block";
"slot" => slot.as_u64(),
);
// Publish block with first available beacon node.
self.beacon_nodes
.first_success(

View File

@ -31,6 +31,7 @@ use crate::beacon_node_fallback::{
};
use crate::doppelganger_service::DoppelgangerService;
use crate::graffiti_file::GraffitiFile;
use crate::initialized_validators::Error::UnableToOpenVotingKeystore;
use account_utils::validator_definitions::ValidatorDefinitions;
use attestation_service::{AttestationService, AttestationServiceBuilder};
use block_service::{BlockService, BlockServiceBuilder};
@ -184,7 +185,16 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
log.clone(),
)
.await
.map_err(|e| format!("Unable to initialize validators: {:?}", e))?;
.map_err(|e| {
match e {
UnableToOpenVotingKeystore(err) => {
format!("Unable to initialize validators: {:?}. If you have recently moved the location of your data directory \
make sure to update the location of voting_keystore_path in your validator_definitions.yml", err)
},
err => {
format!("Unable to initialize validators: {:?}", err)}
}
})?;
let voting_pubkeys: Vec<_> = validators.iter_voting_pubkeys().collect();