Import BLS to execution changes before Capella (#3892)
* Import BLS to execution changes before Capella * Test for BLS to execution change HTTP API * Pack BLS to execution changes in LIFO order * Remove unused var * Clippy
This commit is contained in:
parent
9f2baced0b
commit
a4cfe50ade
@ -2313,32 +2313,74 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
|
||||
/// Verify a signed BLS to execution change before allowing it to propagate on the gossip network.
|
||||
pub fn verify_bls_to_execution_change_for_gossip(
|
||||
pub fn verify_bls_to_execution_change_for_http_api(
|
||||
&self,
|
||||
bls_to_execution_change: SignedBlsToExecutionChange,
|
||||
) -> Result<ObservationOutcome<SignedBlsToExecutionChange, T::EthSpec>, Error> {
|
||||
let current_fork = self.spec.fork_name_at_slot::<T::EthSpec>(self.slot()?);
|
||||
if let ForkName::Base | ForkName::Altair | ForkName::Merge = current_fork {
|
||||
// Disallow BLS to execution changes prior to the Capella fork.
|
||||
return Err(Error::BlsToExecutionChangeBadFork(current_fork));
|
||||
// Before checking the gossip duplicate filter, check that no prior change is already
|
||||
// in our op pool. Ignore these messages: do not gossip, do not try to override the pool.
|
||||
match self
|
||||
.op_pool
|
||||
.bls_to_execution_change_in_pool_equals(&bls_to_execution_change)
|
||||
{
|
||||
Some(true) => return Ok(ObservationOutcome::AlreadyKnown),
|
||||
Some(false) => return Err(Error::BlsToExecutionConflictsWithPool),
|
||||
None => (),
|
||||
}
|
||||
|
||||
let wall_clock_state = self.wall_clock_state()?;
|
||||
// Use the head state to save advancing to the wall-clock slot unnecessarily. The message is
|
||||
// signed with respect to the genesis fork version, and the slot check for gossip is applied
|
||||
// separately. This `Arc` clone of the head is nice and cheap.
|
||||
let head_snapshot = self.head().snapshot;
|
||||
let head_state = &head_snapshot.beacon_state;
|
||||
|
||||
Ok(self
|
||||
.observed_bls_to_execution_changes
|
||||
.lock()
|
||||
.verify_and_observe(bls_to_execution_change, &wall_clock_state, &self.spec)?)
|
||||
.verify_and_observe(bls_to_execution_change, head_state, &self.spec)?)
|
||||
}
|
||||
|
||||
/// Verify a signed BLS to execution change before allowing it to propagate on the gossip network.
|
||||
pub fn verify_bls_to_execution_change_for_gossip(
|
||||
&self,
|
||||
bls_to_execution_change: SignedBlsToExecutionChange,
|
||||
) -> Result<ObservationOutcome<SignedBlsToExecutionChange, T::EthSpec>, Error> {
|
||||
// Ignore BLS to execution changes on gossip prior to Capella.
|
||||
if !self.current_slot_is_post_capella()? {
|
||||
return Err(Error::BlsToExecutionPriorToCapella);
|
||||
}
|
||||
self.verify_bls_to_execution_change_for_http_api(bls_to_execution_change)
|
||||
.or_else(|e| {
|
||||
// On gossip treat conflicts the same as duplicates [IGNORE].
|
||||
match e {
|
||||
Error::BlsToExecutionConflictsWithPool => Ok(ObservationOutcome::AlreadyKnown),
|
||||
e => Err(e),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Check if the current slot is greater than or equal to the Capella fork epoch.
|
||||
pub fn current_slot_is_post_capella(&self) -> Result<bool, Error> {
|
||||
let current_fork = self.spec.fork_name_at_slot::<T::EthSpec>(self.slot()?);
|
||||
if let ForkName::Base | ForkName::Altair | ForkName::Merge = current_fork {
|
||||
Ok(false)
|
||||
} else {
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
/// Import a BLS to execution change to the op pool.
|
||||
///
|
||||
/// Return `true` if the change was added to the pool.
|
||||
pub fn import_bls_to_execution_change(
|
||||
&self,
|
||||
bls_to_execution_change: SigVerifiedOp<SignedBlsToExecutionChange, T::EthSpec>,
|
||||
) {
|
||||
) -> bool {
|
||||
if self.eth1_chain.is_some() {
|
||||
self.op_pool
|
||||
.insert_bls_to_execution_change(bls_to_execution_change);
|
||||
.insert_bls_to_execution_change(bls_to_execution_change)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -206,7 +206,8 @@ pub enum BeaconChainError {
|
||||
MissingPersistedForkChoice,
|
||||
CommitteePromiseFailed(oneshot_broadcast::Error),
|
||||
MaxCommitteePromises(usize),
|
||||
BlsToExecutionChangeBadFork(ForkName),
|
||||
BlsToExecutionPriorToCapella,
|
||||
BlsToExecutionConflictsWithPool,
|
||||
InconsistentFork(InconsistentFork),
|
||||
ProposerHeadForkChoiceError(fork_choice::Error<proto_array::Error>),
|
||||
BlobsUnavailable,
|
||||
|
@ -149,6 +149,7 @@ pub struct Builder<T: BeaconChainTypes> {
|
||||
eth_spec_instance: T::EthSpec,
|
||||
spec: Option<ChainSpec>,
|
||||
validator_keypairs: Option<Vec<Keypair>>,
|
||||
withdrawal_keypairs: Vec<Option<Keypair>>,
|
||||
chain_config: Option<ChainConfig>,
|
||||
store_config: Option<StoreConfig>,
|
||||
#[allow(clippy::type_complexity)]
|
||||
@ -171,6 +172,17 @@ impl<E: EthSpec> Builder<EphemeralHarnessType<E>> {
|
||||
.clone()
|
||||
.expect("cannot build without validator keypairs");
|
||||
|
||||
// For the interop genesis state we know that the withdrawal credentials are set equal
|
||||
// to the validator keypairs. Check for any manually initialised credentials.
|
||||
assert!(
|
||||
self.withdrawal_keypairs.is_empty(),
|
||||
"withdrawal credentials are ignored by fresh_ephemeral_store"
|
||||
);
|
||||
self.withdrawal_keypairs = validator_keypairs
|
||||
.iter()
|
||||
.map(|kp| Some(kp.clone()))
|
||||
.collect();
|
||||
|
||||
let store = Arc::new(
|
||||
HotColdDB::open_ephemeral(
|
||||
self.store_config.clone().unwrap_or_default(),
|
||||
@ -283,6 +295,7 @@ where
|
||||
eth_spec_instance,
|
||||
spec: None,
|
||||
validator_keypairs: None,
|
||||
withdrawal_keypairs: vec![],
|
||||
chain_config: None,
|
||||
store_config: None,
|
||||
store: None,
|
||||
@ -545,6 +558,7 @@ where
|
||||
spec: chain.spec.clone(),
|
||||
chain: Arc::new(chain),
|
||||
validator_keypairs,
|
||||
withdrawal_keypairs: self.withdrawal_keypairs,
|
||||
shutdown_receiver: Arc::new(Mutex::new(shutdown_receiver)),
|
||||
runtime: self.runtime,
|
||||
mock_execution_layer: self.mock_execution_layer,
|
||||
@ -560,6 +574,12 @@ where
|
||||
/// Used for testing.
|
||||
pub struct BeaconChainHarness<T: BeaconChainTypes> {
|
||||
pub validator_keypairs: Vec<Keypair>,
|
||||
/// Optional BLS withdrawal keys for each validator.
|
||||
///
|
||||
/// If a validator index is missing from this vec or their entry is `None` then either
|
||||
/// no BLS withdrawal key was set for them (they had an address from genesis) or the test
|
||||
/// initializer neglected to set this field.
|
||||
pub withdrawal_keypairs: Vec<Option<Keypair>>,
|
||||
|
||||
pub chain: Arc<BeaconChain<T>>,
|
||||
pub spec: ChainSpec,
|
||||
@ -1471,6 +1491,44 @@ where
|
||||
.sign(sk, &fork, genesis_validators_root, &self.chain.spec)
|
||||
}
|
||||
|
||||
pub fn make_bls_to_execution_change(
|
||||
&self,
|
||||
validator_index: u64,
|
||||
address: Address,
|
||||
) -> SignedBlsToExecutionChange {
|
||||
let keypair = self.get_withdrawal_keypair(validator_index);
|
||||
self.make_bls_to_execution_change_with_keys(
|
||||
validator_index,
|
||||
address,
|
||||
&keypair.pk,
|
||||
&keypair.sk,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn make_bls_to_execution_change_with_keys(
|
||||
&self,
|
||||
validator_index: u64,
|
||||
address: Address,
|
||||
pubkey: &PublicKey,
|
||||
secret_key: &SecretKey,
|
||||
) -> SignedBlsToExecutionChange {
|
||||
let genesis_validators_root = self.chain.genesis_validators_root;
|
||||
BlsToExecutionChange {
|
||||
validator_index,
|
||||
from_bls_pubkey: pubkey.compress(),
|
||||
to_execution_address: address,
|
||||
}
|
||||
.sign(secret_key, genesis_validators_root, &self.chain.spec)
|
||||
}
|
||||
|
||||
pub fn get_withdrawal_keypair(&self, validator_index: u64) -> &Keypair {
|
||||
self.withdrawal_keypairs
|
||||
.get(validator_index as usize)
|
||||
.expect("BLS withdrawal key missing from harness")
|
||||
.as_ref()
|
||||
.expect("no withdrawal key for validator")
|
||||
}
|
||||
|
||||
pub fn add_voluntary_exit(
|
||||
&self,
|
||||
block: &mut BeaconBlock<E>,
|
||||
|
@ -1671,7 +1671,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.and_then(
|
||||
|chain: Arc<BeaconChain<T>>,
|
||||
address_changes: Vec<SignedBlsToExecutionChange>,
|
||||
#[allow(unused)] network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||
log: Logger| {
|
||||
blocking_json_task(move || {
|
||||
let mut failures = vec![];
|
||||
@ -1679,15 +1679,38 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
for (index, address_change) in address_changes.into_iter().enumerate() {
|
||||
let validator_index = address_change.message.validator_index;
|
||||
|
||||
match chain.verify_bls_to_execution_change_for_gossip(address_change) {
|
||||
match chain.verify_bls_to_execution_change_for_http_api(address_change) {
|
||||
Ok(ObservationOutcome::New(verified_address_change)) => {
|
||||
publish_pubsub_message(
|
||||
&network_tx,
|
||||
PubsubMessage::BlsToExecutionChange(Box::new(
|
||||
verified_address_change.as_inner().clone(),
|
||||
)),
|
||||
)?;
|
||||
chain.import_bls_to_execution_change(verified_address_change);
|
||||
let validator_index =
|
||||
verified_address_change.as_inner().message.validator_index;
|
||||
let address = verified_address_change
|
||||
.as_inner()
|
||||
.message
|
||||
.to_execution_address;
|
||||
|
||||
// New to P2P *and* op pool, gossip immediately if post-Capella.
|
||||
let publish = chain.current_slot_is_post_capella().unwrap_or(false);
|
||||
if publish {
|
||||
publish_pubsub_message(
|
||||
&network_tx,
|
||||
PubsubMessage::BlsToExecutionChange(Box::new(
|
||||
verified_address_change.as_inner().clone(),
|
||||
)),
|
||||
)?;
|
||||
}
|
||||
|
||||
// Import to op pool (may return `false` if there's a race).
|
||||
let imported =
|
||||
chain.import_bls_to_execution_change(verified_address_change);
|
||||
|
||||
info!(
|
||||
log,
|
||||
"Processed BLS to execution change";
|
||||
"validator_index" => validator_index,
|
||||
"address" => ?address,
|
||||
"published" => publish,
|
||||
"imported" => imported,
|
||||
);
|
||||
}
|
||||
Ok(ObservationOutcome::AlreadyKnown) => {
|
||||
debug!(
|
||||
@ -1697,11 +1720,12 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
warn!(
|
||||
log,
|
||||
"Invalid BLS to execution change";
|
||||
"validator_index" => validator_index,
|
||||
"source" => "HTTP API",
|
||||
"reason" => ?e,
|
||||
"source" => "HTTP",
|
||||
);
|
||||
failures.push(api_types::Failure::new(
|
||||
index,
|
||||
|
@ -1,8 +1,8 @@
|
||||
//! Tests for API behaviour across fork boundaries.
|
||||
use crate::common::*;
|
||||
use beacon_chain::{test_utils::RelativeSyncCommittee, StateSkipConfig};
|
||||
use eth2::types::{StateId, SyncSubcommittee};
|
||||
use types::{ChainSpec, Epoch, EthSpec, MinimalEthSpec, Slot};
|
||||
use eth2::types::{IndexedErrorMessage, StateId, SyncSubcommittee};
|
||||
use types::{Address, ChainSpec, Epoch, EthSpec, MinimalEthSpec, Slot};
|
||||
|
||||
type E = MinimalEthSpec;
|
||||
|
||||
@ -12,6 +12,14 @@ fn altair_spec(altair_fork_epoch: Epoch) -> ChainSpec {
|
||||
spec
|
||||
}
|
||||
|
||||
fn capella_spec(capella_fork_epoch: Epoch) -> ChainSpec {
|
||||
let mut spec = E::default_spec();
|
||||
spec.altair_fork_epoch = Some(Epoch::new(0));
|
||||
spec.bellatrix_fork_epoch = Some(Epoch::new(0));
|
||||
spec.capella_fork_epoch = Some(capella_fork_epoch);
|
||||
spec
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn sync_committee_duties_across_fork() {
|
||||
let validator_count = E::sync_committee_size();
|
||||
@ -307,3 +315,171 @@ async fn sync_committee_indices_across_fork() {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Assert that an HTTP API error has the given status code and indexed errors for the given indices.
|
||||
fn assert_server_indexed_error(error: eth2::Error, status_code: u16, indices: Vec<usize>) {
|
||||
let eth2::Error::ServerIndexedMessage(IndexedErrorMessage {
|
||||
code,
|
||||
failures,
|
||||
..
|
||||
}) = error else {
|
||||
panic!("wrong error, expected ServerIndexedMessage, got: {error:?}")
|
||||
};
|
||||
assert_eq!(code, status_code);
|
||||
assert_eq!(failures.len(), indices.len());
|
||||
for (index, failure) in indices.into_iter().zip(failures) {
|
||||
assert_eq!(failure.index, index as u64);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn bls_to_execution_changes_update_all_around_capella_fork() {
|
||||
let validator_count = 128;
|
||||
let fork_epoch = Epoch::new(2);
|
||||
let spec = capella_spec(fork_epoch);
|
||||
let max_bls_to_execution_changes = E::max_bls_to_execution_changes();
|
||||
let tester = InteractiveTester::<E>::new(Some(spec.clone()), validator_count).await;
|
||||
let harness = &tester.harness;
|
||||
let client = &tester.client;
|
||||
|
||||
let all_validators = harness.get_all_validators();
|
||||
let all_validators_u64 = all_validators.iter().map(|x| *x as u64).collect::<Vec<_>>();
|
||||
|
||||
// Create a bunch of valid address changes.
|
||||
let valid_address_changes = all_validators_u64
|
||||
.iter()
|
||||
.map(|&validator_index| {
|
||||
harness.make_bls_to_execution_change(
|
||||
validator_index,
|
||||
Address::from_low_u64_be(validator_index),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Address changes which conflict with `valid_address_changes` on the address chosen.
|
||||
let conflicting_address_changes = all_validators_u64
|
||||
.iter()
|
||||
.map(|&validator_index| {
|
||||
harness.make_bls_to_execution_change(
|
||||
validator_index,
|
||||
Address::from_low_u64_be(validator_index + 1),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Address changes signed with the wrong key.
|
||||
let wrong_key_address_changes = all_validators_u64
|
||||
.iter()
|
||||
.map(|&validator_index| {
|
||||
// Use the correct pubkey.
|
||||
let pubkey = &harness.get_withdrawal_keypair(validator_index).pk;
|
||||
// And the wrong secret key.
|
||||
let secret_key = &harness
|
||||
.get_withdrawal_keypair((validator_index + 1) % validator_count as u64)
|
||||
.sk;
|
||||
harness.make_bls_to_execution_change_with_keys(
|
||||
validator_index,
|
||||
Address::from_low_u64_be(validator_index),
|
||||
pubkey,
|
||||
secret_key,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Submit some changes before Capella. Just enough to fill two blocks.
|
||||
let num_pre_capella = validator_count / 4;
|
||||
let blocks_filled_pre_capella = 2;
|
||||
assert_eq!(
|
||||
num_pre_capella,
|
||||
blocks_filled_pre_capella * max_bls_to_execution_changes
|
||||
);
|
||||
|
||||
client
|
||||
.post_beacon_pool_bls_to_execution_changes(&valid_address_changes[..num_pre_capella])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Conflicting changes for the same validators should all fail.
|
||||
let error = client
|
||||
.post_beacon_pool_bls_to_execution_changes(&conflicting_address_changes[..num_pre_capella])
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_server_indexed_error(error, 400, (0..num_pre_capella).collect());
|
||||
|
||||
// Re-submitting the same changes should be accepted.
|
||||
client
|
||||
.post_beacon_pool_bls_to_execution_changes(&valid_address_changes[..num_pre_capella])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Invalid changes signed with the wrong keys should all be rejected without affecting the seen
|
||||
// indices filters (apply ALL of them).
|
||||
let error = client
|
||||
.post_beacon_pool_bls_to_execution_changes(&wrong_key_address_changes)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_server_indexed_error(error, 400, all_validators.clone());
|
||||
|
||||
// Advance to right before Capella.
|
||||
let capella_slot = fork_epoch.start_slot(E::slots_per_epoch());
|
||||
harness.extend_to_slot(capella_slot - 1).await;
|
||||
assert_eq!(harness.head_slot(), capella_slot - 1);
|
||||
|
||||
// Add Capella blocks which should be full of BLS to execution changes.
|
||||
for i in 0..validator_count / max_bls_to_execution_changes {
|
||||
let head_block_root = harness.extend_slots(1).await;
|
||||
let head_block = harness
|
||||
.chain
|
||||
.get_block(&head_block_root)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let bls_to_execution_changes = head_block
|
||||
.message()
|
||||
.body()
|
||||
.bls_to_execution_changes()
|
||||
.unwrap();
|
||||
|
||||
// Block should be full.
|
||||
assert_eq!(
|
||||
bls_to_execution_changes.len(),
|
||||
max_bls_to_execution_changes,
|
||||
"block not full on iteration {i}"
|
||||
);
|
||||
|
||||
// Included changes should be the ones from `valid_address_changes` in any order.
|
||||
for address_change in bls_to_execution_changes.iter() {
|
||||
assert!(valid_address_changes.contains(address_change));
|
||||
}
|
||||
|
||||
// After the initial 2 blocks, add the rest of the changes using a large
|
||||
// request containing all the valid, all the conflicting and all the invalid.
|
||||
// Despite the invalid and duplicate messages, the new ones should still get picked up by
|
||||
// the pool.
|
||||
if i == blocks_filled_pre_capella - 1 {
|
||||
let all_address_changes: Vec<_> = [
|
||||
valid_address_changes.clone(),
|
||||
conflicting_address_changes.clone(),
|
||||
wrong_key_address_changes.clone(),
|
||||
]
|
||||
.concat();
|
||||
|
||||
let error = client
|
||||
.post_beacon_pool_bls_to_execution_changes(&all_address_changes)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_server_indexed_error(
|
||||
error,
|
||||
400,
|
||||
(validator_count..3 * validator_count).collect(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Eventually all validators should have eth1 withdrawal credentials.
|
||||
let head_state = harness.get_current_state();
|
||||
for validator in head_state.validators() {
|
||||
assert!(validator.has_eth1_withdrawal_credential(&spec));
|
||||
}
|
||||
}
|
||||
|
@ -1231,7 +1231,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
"error" => ?e
|
||||
);
|
||||
// We ignore pre-capella messages without penalizing peers.
|
||||
if matches!(e, BeaconChainError::BlsToExecutionChangeBadFork(_)) {
|
||||
if matches!(e, BeaconChainError::BlsToExecutionPriorToCapella) {
|
||||
self.propagate_validation_result(
|
||||
message_id,
|
||||
peer_id,
|
||||
|
105
beacon_node/operation_pool/src/bls_to_execution_changes.rs
Normal file
105
beacon_node/operation_pool/src/bls_to_execution_changes.rs
Normal file
@ -0,0 +1,105 @@
|
||||
use state_processing::SigVerifiedOp;
|
||||
use std::collections::{hash_map::Entry, HashMap};
|
||||
use std::sync::Arc;
|
||||
use types::{
|
||||
AbstractExecPayload, BeaconState, ChainSpec, EthSpec, SignedBeaconBlock,
|
||||
SignedBlsToExecutionChange,
|
||||
};
|
||||
|
||||
/// Pool of BLS to execution changes that maintains a LIFO queue and an index by validator.
|
||||
///
|
||||
/// Using the LIFO queue for block production disincentivises spam on P2P at the Capella fork,
|
||||
/// and is less-relevant after that.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct BlsToExecutionChanges<T: EthSpec> {
|
||||
/// Map from validator index to BLS to execution change.
|
||||
by_validator_index: HashMap<u64, Arc<SigVerifiedOp<SignedBlsToExecutionChange, T>>>,
|
||||
/// Last-in-first-out (LIFO) queue of verified messages.
|
||||
queue: Vec<Arc<SigVerifiedOp<SignedBlsToExecutionChange, T>>>,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> BlsToExecutionChanges<T> {
|
||||
pub fn existing_change_equals(
|
||||
&self,
|
||||
address_change: &SignedBlsToExecutionChange,
|
||||
) -> Option<bool> {
|
||||
self.by_validator_index
|
||||
.get(&address_change.message.validator_index)
|
||||
.map(|existing| existing.as_inner() == address_change)
|
||||
}
|
||||
|
||||
pub fn insert(
|
||||
&mut self,
|
||||
verified_change: SigVerifiedOp<SignedBlsToExecutionChange, T>,
|
||||
) -> bool {
|
||||
// Wrap in an `Arc` once on insert.
|
||||
let verified_change = Arc::new(verified_change);
|
||||
match self
|
||||
.by_validator_index
|
||||
.entry(verified_change.as_inner().message.validator_index)
|
||||
{
|
||||
Entry::Vacant(entry) => {
|
||||
self.queue.push(verified_change.clone());
|
||||
entry.insert(verified_change);
|
||||
true
|
||||
}
|
||||
Entry::Occupied(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// FIFO ordering, used for persistence to disk.
|
||||
pub fn iter_fifo(
|
||||
&self,
|
||||
) -> impl Iterator<Item = &Arc<SigVerifiedOp<SignedBlsToExecutionChange, T>>> {
|
||||
self.queue.iter()
|
||||
}
|
||||
|
||||
/// LIFO ordering, used for block packing.
|
||||
pub fn iter_lifo(
|
||||
&self,
|
||||
) -> impl Iterator<Item = &Arc<SigVerifiedOp<SignedBlsToExecutionChange, T>>> {
|
||||
self.queue.iter().rev()
|
||||
}
|
||||
|
||||
/// Prune BLS to execution changes that have been applied to the state more than 1 block ago.
|
||||
///
|
||||
/// The block check is necessary to avoid pruning too eagerly and losing the ability to include
|
||||
/// address changes during re-orgs. This is isn't *perfect* so some address changes could
|
||||
/// still get stuck if there are gnarly re-orgs and the changes can't be widely republished
|
||||
/// due to the gossip duplicate rules.
|
||||
pub fn prune<Payload: AbstractExecPayload<T>>(
|
||||
&mut self,
|
||||
head_block: &SignedBeaconBlock<T, Payload>,
|
||||
head_state: &BeaconState<T>,
|
||||
spec: &ChainSpec,
|
||||
) {
|
||||
let mut validator_indices_pruned = vec![];
|
||||
|
||||
self.queue.retain(|address_change| {
|
||||
let validator_index = address_change.as_inner().message.validator_index;
|
||||
head_state
|
||||
.validators()
|
||||
.get(validator_index as usize)
|
||||
.map_or(true, |validator| {
|
||||
let prune = validator.has_eth1_withdrawal_credential(spec)
|
||||
&& head_block
|
||||
.message()
|
||||
.body()
|
||||
.bls_to_execution_changes()
|
||||
.map_or(true, |recent_changes| {
|
||||
!recent_changes
|
||||
.iter()
|
||||
.any(|c| c.message.validator_index == validator_index)
|
||||
});
|
||||
if prune {
|
||||
validator_indices_pruned.push(validator_index);
|
||||
}
|
||||
!prune
|
||||
})
|
||||
});
|
||||
|
||||
for validator_index in validator_indices_pruned {
|
||||
self.by_validator_index.remove(&validator_index);
|
||||
}
|
||||
}
|
||||
}
|
@ -2,6 +2,7 @@ mod attestation;
|
||||
mod attestation_id;
|
||||
mod attestation_storage;
|
||||
mod attester_slashing;
|
||||
mod bls_to_execution_changes;
|
||||
mod max_cover;
|
||||
mod metrics;
|
||||
mod persistence;
|
||||
@ -18,6 +19,7 @@ pub use persistence::{
|
||||
pub use reward_cache::RewardCache;
|
||||
|
||||
use crate::attestation_storage::{AttestationMap, CheckpointKey};
|
||||
use crate::bls_to_execution_changes::BlsToExecutionChanges;
|
||||
use crate::sync_aggregate_id::SyncAggregateId;
|
||||
use attester_slashing::AttesterSlashingMaxCover;
|
||||
use max_cover::maximum_cover;
|
||||
@ -51,8 +53,8 @@ pub struct OperationPool<T: EthSpec + Default> {
|
||||
proposer_slashings: RwLock<HashMap<u64, SigVerifiedOp<ProposerSlashing, T>>>,
|
||||
/// Map from exiting validator to their exit data.
|
||||
voluntary_exits: RwLock<HashMap<u64, SigVerifiedOp<SignedVoluntaryExit, T>>>,
|
||||
/// Map from credential changing validator to their execution change data.
|
||||
bls_to_execution_changes: RwLock<HashMap<u64, SigVerifiedOp<SignedBlsToExecutionChange, T>>>,
|
||||
/// Map from credential changing validator to their position in the queue.
|
||||
bls_to_execution_changes: RwLock<BlsToExecutionChanges<T>>,
|
||||
/// Reward cache for accelerating attestation packing.
|
||||
reward_cache: RwLock<RewardCache>,
|
||||
_phantom: PhantomData<T>,
|
||||
@ -513,15 +515,28 @@ impl<T: EthSpec> OperationPool<T> {
|
||||
);
|
||||
}
|
||||
|
||||
/// Insert a BLS to execution change into the pool.
|
||||
/// Check if an address change equal to `address_change` is already in the pool.
|
||||
///
|
||||
/// Return `None` if no address change for the validator index exists in the pool.
|
||||
pub fn bls_to_execution_change_in_pool_equals(
|
||||
&self,
|
||||
address_change: &SignedBlsToExecutionChange,
|
||||
) -> Option<bool> {
|
||||
self.bls_to_execution_changes
|
||||
.read()
|
||||
.existing_change_equals(address_change)
|
||||
}
|
||||
|
||||
/// Insert a BLS to execution change into the pool, *only if* no prior change is known.
|
||||
///
|
||||
/// Return `true` if the change was inserted.
|
||||
pub fn insert_bls_to_execution_change(
|
||||
&self,
|
||||
verified_change: SigVerifiedOp<SignedBlsToExecutionChange, T>,
|
||||
) {
|
||||
self.bls_to_execution_changes.write().insert(
|
||||
verified_change.as_inner().message.validator_index,
|
||||
verified_change,
|
||||
);
|
||||
) -> bool {
|
||||
self.bls_to_execution_changes
|
||||
.write()
|
||||
.insert(verified_change)
|
||||
}
|
||||
|
||||
/// Get a list of execution changes for inclusion in a block.
|
||||
@ -533,7 +548,7 @@ impl<T: EthSpec> OperationPool<T> {
|
||||
spec: &ChainSpec,
|
||||
) -> Vec<SignedBlsToExecutionChange> {
|
||||
filter_limit_operations(
|
||||
self.bls_to_execution_changes.read().values(),
|
||||
self.bls_to_execution_changes.read().iter_lifo(),
|
||||
|address_change| {
|
||||
address_change.signature_is_still_valid(&state.fork())
|
||||
&& state
|
||||
@ -548,33 +563,15 @@ impl<T: EthSpec> OperationPool<T> {
|
||||
}
|
||||
|
||||
/// Prune BLS to execution changes that have been applied to the state more than 1 block ago.
|
||||
///
|
||||
/// The block check is necessary to avoid pruning too eagerly and losing the ability to include
|
||||
/// address changes during re-orgs. This is isn't *perfect* so some address changes could
|
||||
/// still get stuck if there are gnarly re-orgs and the changes can't be widely republished
|
||||
/// due to the gossip duplicate rules.
|
||||
pub fn prune_bls_to_execution_changes<Payload: AbstractExecPayload<T>>(
|
||||
&self,
|
||||
head_block: &SignedBeaconBlock<T, Payload>,
|
||||
head_state: &BeaconState<T>,
|
||||
spec: &ChainSpec,
|
||||
) {
|
||||
prune_validator_hash_map(
|
||||
&mut self.bls_to_execution_changes.write(),
|
||||
|validator_index, validator| {
|
||||
validator.has_eth1_withdrawal_credential(spec)
|
||||
&& head_block
|
||||
.message()
|
||||
.body()
|
||||
.bls_to_execution_changes()
|
||||
.map_or(true, |recent_changes| {
|
||||
!recent_changes
|
||||
.iter()
|
||||
.any(|c| c.message.validator_index == validator_index)
|
||||
})
|
||||
},
|
||||
head_state,
|
||||
);
|
||||
self.bls_to_execution_changes
|
||||
.write()
|
||||
.prune(head_block, head_state, spec)
|
||||
}
|
||||
|
||||
/// Prune all types of transactions given the latest head state and head fork.
|
||||
@ -663,8 +660,8 @@ impl<T: EthSpec> OperationPool<T> {
|
||||
pub fn get_all_bls_to_execution_changes(&self) -> Vec<SignedBlsToExecutionChange> {
|
||||
self.bls_to_execution_changes
|
||||
.read()
|
||||
.iter()
|
||||
.map(|(_, address_change)| address_change.as_inner().clone())
|
||||
.iter_fifo()
|
||||
.map(|address_change| address_change.as_inner().clone())
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
use crate::attestation_id::AttestationId;
|
||||
use crate::attestation_storage::AttestationMap;
|
||||
use crate::bls_to_execution_changes::BlsToExecutionChanges;
|
||||
use crate::sync_aggregate_id::SyncAggregateId;
|
||||
use crate::OpPoolError;
|
||||
use crate::OperationPool;
|
||||
@ -105,8 +106,8 @@ impl<T: EthSpec> PersistedOperationPool<T> {
|
||||
let bls_to_execution_changes = operation_pool
|
||||
.bls_to_execution_changes
|
||||
.read()
|
||||
.iter()
|
||||
.map(|(_, bls_to_execution_change)| bls_to_execution_change.clone())
|
||||
.iter_fifo()
|
||||
.map(|bls_to_execution_change| (**bls_to_execution_change).clone())
|
||||
.collect();
|
||||
|
||||
PersistedOperationPool::V14(PersistedOperationPoolV14 {
|
||||
@ -153,18 +154,13 @@ impl<T: EthSpec> PersistedOperationPool<T> {
|
||||
PersistedOperationPool::V5(_) | PersistedOperationPool::V12(_) => {
|
||||
return Err(OpPoolError::IncorrectOpPoolVariant)
|
||||
}
|
||||
PersistedOperationPool::V14(pool) => RwLock::new(
|
||||
pool.bls_to_execution_changes
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|bls_to_execution_change| {
|
||||
(
|
||||
bls_to_execution_change.as_inner().message.validator_index,
|
||||
bls_to_execution_change,
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
),
|
||||
PersistedOperationPool::V14(pool) => {
|
||||
let mut bls_to_execution_changes = BlsToExecutionChanges::default();
|
||||
for bls_to_execution_change in pool.bls_to_execution_changes {
|
||||
bls_to_execution_changes.insert(bls_to_execution_change);
|
||||
}
|
||||
RwLock::new(bls_to_execution_changes)
|
||||
}
|
||||
};
|
||||
let op_pool = OperationPool {
|
||||
attestations,
|
||||
|
@ -1042,6 +1042,24 @@ impl BeaconNodeHttpClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `POST beacon/pool/bls_to_execution_changes`
|
||||
pub async fn post_beacon_pool_bls_to_execution_changes(
|
||||
&self,
|
||||
address_changes: &[SignedBlsToExecutionChange],
|
||||
) -> Result<(), Error> {
|
||||
let mut path = self.eth_path(V1)?;
|
||||
|
||||
path.path_segments_mut()
|
||||
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||
.push("beacon")
|
||||
.push("pool")
|
||||
.push("bls_to_execution_changes");
|
||||
|
||||
self.post(path, &address_changes).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `GET beacon/deposit_snapshot`
|
||||
pub async fn get_deposit_snapshot(&self) -> Result<Option<types::DepositTreeSnapshot>, Error> {
|
||||
use ssz::Decode;
|
||||
|
@ -67,7 +67,7 @@ where
|
||||
fn new(op: T, state: &BeaconState<E>) -> Self {
|
||||
let verified_against = VerifiedAgainst {
|
||||
fork_versions: op
|
||||
.verification_epochs(state.current_epoch())
|
||||
.verification_epochs()
|
||||
.into_iter()
|
||||
.map(|epoch| state.fork().get_fork_version(epoch))
|
||||
.collect(),
|
||||
@ -89,13 +89,9 @@ where
|
||||
}
|
||||
|
||||
pub fn signature_is_still_valid(&self, current_fork: &Fork) -> bool {
|
||||
// Pass the fork's epoch as the effective current epoch. If the message is a current-epoch
|
||||
// style message like `SignedBlsToExecutionChange` then `get_fork_version` will return the
|
||||
// current fork version and we'll check it matches the fork version the message was checked
|
||||
// against.
|
||||
let effective_current_epoch = current_fork.epoch;
|
||||
// The .all() will return true if the iterator is empty.
|
||||
self.as_inner()
|
||||
.verification_epochs(effective_current_epoch)
|
||||
.verification_epochs()
|
||||
.into_iter()
|
||||
.zip(self.verified_against.fork_versions.iter())
|
||||
.all(|(epoch, verified_fork_version)| {
|
||||
@ -126,12 +122,8 @@ pub trait VerifyOperation<E: EthSpec>: Encode + Decode + Sized {
|
||||
///
|
||||
/// These need to map 1-to-1 to the `SigVerifiedOp::verified_against` for this type.
|
||||
///
|
||||
/// If the message contains no inherent epoch it should return the `current_epoch` that is
|
||||
/// passed in, as that's the epoch at which it was verified.
|
||||
fn verification_epochs(
|
||||
&self,
|
||||
current_epoch: Epoch,
|
||||
) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]>;
|
||||
/// If the message is valid across all forks it should return an empty smallvec.
|
||||
fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]>;
|
||||
}
|
||||
|
||||
impl<E: EthSpec> VerifyOperation<E> for SignedVoluntaryExit {
|
||||
@ -147,7 +139,7 @@ impl<E: EthSpec> VerifyOperation<E> for SignedVoluntaryExit {
|
||||
}
|
||||
|
||||
#[allow(clippy::integer_arithmetic)]
|
||||
fn verification_epochs(&self, _: Epoch) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
|
||||
fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
|
||||
smallvec![self.message.epoch]
|
||||
}
|
||||
}
|
||||
@ -165,7 +157,7 @@ impl<E: EthSpec> VerifyOperation<E> for AttesterSlashing<E> {
|
||||
}
|
||||
|
||||
#[allow(clippy::integer_arithmetic)]
|
||||
fn verification_epochs(&self, _: Epoch) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
|
||||
fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
|
||||
smallvec![
|
||||
self.attestation_1.data.target.epoch,
|
||||
self.attestation_2.data.target.epoch
|
||||
@ -186,7 +178,7 @@ impl<E: EthSpec> VerifyOperation<E> for ProposerSlashing {
|
||||
}
|
||||
|
||||
#[allow(clippy::integer_arithmetic)]
|
||||
fn verification_epochs(&self, _: Epoch) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
|
||||
fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
|
||||
// Only need a single epoch because the slots of the two headers must be equal.
|
||||
smallvec![self
|
||||
.signed_header_1
|
||||
@ -209,10 +201,7 @@ impl<E: EthSpec> VerifyOperation<E> for SignedBlsToExecutionChange {
|
||||
}
|
||||
|
||||
#[allow(clippy::integer_arithmetic)]
|
||||
fn verification_epochs(
|
||||
&self,
|
||||
current_epoch: Epoch,
|
||||
) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
|
||||
smallvec![current_epoch]
|
||||
fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> {
|
||||
smallvec![]
|
||||
}
|
||||
}
|
||||
|
@ -28,6 +28,26 @@ pub struct BlsToExecutionChange {
|
||||
|
||||
impl SignedRoot for BlsToExecutionChange {}
|
||||
|
||||
impl BlsToExecutionChange {
|
||||
pub fn sign(
|
||||
self,
|
||||
secret_key: &SecretKey,
|
||||
genesis_validators_root: Hash256,
|
||||
spec: &ChainSpec,
|
||||
) -> SignedBlsToExecutionChange {
|
||||
let domain = spec.compute_domain(
|
||||
Domain::BlsToExecutionChange,
|
||||
spec.genesis_fork_version,
|
||||
genesis_validators_root,
|
||||
);
|
||||
let message = self.signing_root(domain);
|
||||
SignedBlsToExecutionChange {
|
||||
message: self,
|
||||
signature: secret_key.sign(message),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
Loading…
Reference in New Issue
Block a user