diff --git a/Cargo.lock b/Cargo.lock index 18426b9e5..5aa7a3923 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1062,8 +1062,10 @@ dependencies = [ "lazy_static", "lighthouse_metrics", "lighthouse_network", + "logging", "monitoring_api", "network", + "operation_pool", "parking_lot 0.12.1", "sensitive_url", "serde", @@ -1073,6 +1075,7 @@ dependencies = [ "slasher_service", "slog", "slot_clock", + "state_processing", "store", "task_executor", "time 0.3.17", @@ -3224,6 +3227,7 @@ dependencies = [ "logging", "lru 0.7.8", "network", + "operation_pool", "parking_lot 0.12.1", "proto_array", "safe_arith", @@ -5007,6 +5011,7 @@ dependencies = [ "lru_cache", "matches", "num_cpus", + "operation_pool", "rand 0.8.5", "rlp", "slog", @@ -5342,6 +5347,7 @@ dependencies = [ "lighthouse_metrics", "maplit", "parking_lot 0.12.1", + "rand 0.8.5", "rayon", "serde", "serde_derive", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index bf48c32a6..2aae03b72 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -70,7 +70,7 @@ use fork_choice::{ use futures::channel::mpsc::Sender; use itertools::process_results; use itertools::Itertools; -use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool}; +use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella}; use parking_lot::{Mutex, RwLock}; use proto_array::{CountUnrealizedFull, DoNotReOrg, ProposerHeadError}; use safe_arith::SafeArith; @@ -2289,10 +2289,11 @@ impl BeaconChain { pub fn import_bls_to_execution_change( &self, bls_to_execution_change: SigVerifiedOp, + received_pre_capella: ReceivedPreCapella, ) -> bool { if self.eth1_chain.is_some() { self.op_pool - .insert_bls_to_execution_change(bls_to_execution_change) + .insert_bls_to_execution_change(bls_to_execution_change, received_pre_capella) } else { false } diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index 8684bafe2..35202a3c5 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -2,6 +2,7 @@ mod migration_schema_v12; mod migration_schema_v13; mod migration_schema_v14; +mod migration_schema_v15; use crate::beacon_chain::{BeaconChainTypes, ETH1_CACHE_DB_KEY}; use crate::eth1_chain::SszEth1; @@ -123,6 +124,14 @@ pub fn migrate_schema( let ops = migration_schema_v14::downgrade_from_v14::(db.clone(), log)?; db.store_schema_version_atomically(to, ops) } + (SchemaVersion(14), SchemaVersion(15)) => { + let ops = migration_schema_v15::upgrade_to_v15::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } + (SchemaVersion(15), SchemaVersion(14)) => { + let ops = migration_schema_v15::downgrade_from_v15::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v15.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v15.rs new file mode 100644 index 000000000..f4adc2cf4 --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v15.rs @@ -0,0 +1,78 @@ +use crate::beacon_chain::{BeaconChainTypes, OP_POOL_DB_KEY}; +use operation_pool::{ + PersistedOperationPool, PersistedOperationPoolV14, PersistedOperationPoolV15, +}; +use slog::{debug, info, Logger}; +use std::sync::Arc; +use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem}; + +pub fn upgrade_to_v15( + db: Arc>, + log: Logger, +) -> Result, Error> { + // Load a V14 op pool and transform it to V15. + let PersistedOperationPoolV14:: { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + bls_to_execution_changes, + } = if let Some(op_pool_v14) = db.get_item(&OP_POOL_DB_KEY)? { + op_pool_v14 + } else { + debug!(log, "Nothing to do, no operation pool stored"); + return Ok(vec![]); + }; + + let v15 = PersistedOperationPool::V15(PersistedOperationPoolV15 { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + bls_to_execution_changes, + // Initialize with empty set + capella_bls_change_broadcast_indices: <_>::default(), + }); + Ok(vec![v15.as_kv_store_op(OP_POOL_DB_KEY)]) +} + +pub fn downgrade_from_v15( + db: Arc>, + log: Logger, +) -> Result, Error> { + // Load a V15 op pool and transform it to V14. + let PersistedOperationPoolV15 { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + bls_to_execution_changes, + capella_bls_change_broadcast_indices, + } = if let Some(PersistedOperationPool::::V15(op_pool)) = + db.get_item(&OP_POOL_DB_KEY)? + { + op_pool + } else { + debug!(log, "Nothing to do, no operation pool stored"); + return Ok(vec![]); + }; + + info!( + log, + "Forgetting address changes for Capella broadcast"; + "count" => capella_bls_change_broadcast_indices.len(), + ); + + let v14 = PersistedOperationPoolV14 { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + bls_to_execution_changes, + }; + Ok(vec![v14.as_kv_store_op(OP_POOL_DB_KEY)]) +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index daba7115e..f1b9bc83c 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -310,6 +310,21 @@ where self } + /// Initializes the BLS withdrawal keypairs for `num_keypairs` validators to + /// the "determistic" values, regardless of wether or not the validator has + /// a BLS or execution address in the genesis deposits. + /// + /// This aligns with the withdrawal commitments used in the "interop" + /// genesis states. + pub fn deterministic_withdrawal_keypairs(self, num_keypairs: usize) -> Self { + self.withdrawal_keypairs( + types::test_utils::generate_deterministic_keypairs(num_keypairs) + .into_iter() + .map(Option::Some) + .collect(), + ) + } + pub fn withdrawal_keypairs(mut self, withdrawal_keypairs: Vec>) -> Self { self.withdrawal_keypairs = withdrawal_keypairs; self diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index d01f2505c..9a49843a9 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -6,6 +6,10 @@ edition = "2021" [dev-dependencies] serde_yaml = "0.8.13" +logging = { path = "../../common/logging" } +state_processing = { path = "../../consensus/state_processing" } +operation_pool = { path = "../operation_pool" } +tokio = "1.14.0" [dependencies] beacon_chain = { path = "../beacon_chain" } diff --git a/beacon_node/client/src/address_change_broadcast.rs b/beacon_node/client/src/address_change_broadcast.rs new file mode 100644 index 000000000..272ee908f --- /dev/null +++ b/beacon_node/client/src/address_change_broadcast.rs @@ -0,0 +1,322 @@ +use crate::*; +use lighthouse_network::PubsubMessage; +use network::NetworkMessage; +use slog::{debug, info, warn, Logger}; +use slot_clock::SlotClock; +use std::cmp; +use std::collections::HashSet; +use std::mem; +use std::time::Duration; +use tokio::sync::mpsc::UnboundedSender; +use tokio::time::sleep; +use types::EthSpec; + +/// The size of each chunk of addresses changes to be broadcast at the Capella +/// fork. +const BROADCAST_CHUNK_SIZE: usize = 128; +/// The delay between broadcasting each chunk. +const BROADCAST_CHUNK_DELAY: Duration = Duration::from_millis(500); + +/// If the Capella fork has already been reached, `broadcast_address_changes` is +/// called immediately. +/// +/// If the Capella fork has not been reached, waits until the start of the fork +/// epoch and then calls `broadcast_address_changes`. +pub async fn broadcast_address_changes_at_capella( + chain: &BeaconChain, + network_send: UnboundedSender>, + log: &Logger, +) { + let spec = &chain.spec; + let slot_clock = &chain.slot_clock; + + let capella_fork_slot = if let Some(epoch) = spec.capella_fork_epoch { + epoch.start_slot(T::EthSpec::slots_per_epoch()) + } else { + // Exit now if Capella is not defined. + return; + }; + + // Wait until the Capella fork epoch. + while chain.slot().map_or(true, |slot| slot < capella_fork_slot) { + match slot_clock.duration_to_slot(capella_fork_slot) { + Some(duration) => { + // Sleep until the Capella fork. + sleep(duration).await; + break; + } + None => { + // We were unable to read the slot clock wait another slot + // and then try again. + sleep(slot_clock.slot_duration()).await; + } + } + } + + // The following function will be called in two scenarios: + // + // 1. The node has been running for some time and the Capella fork has just + // been reached. + // 2. The node has just started and it is *after* the Capella fork. + broadcast_address_changes(chain, network_send, log).await +} + +/// Broadcasts any address changes that are flagged for broadcasting at the +/// Capella fork epoch. +/// +/// Address changes are published in chunks, with a delay between each chunk. +/// This helps reduce the load on the P2P network and also helps prevent us from +/// clogging our `network_send` channel and being late to publish +/// blocks, attestations, etc. +pub async fn broadcast_address_changes( + chain: &BeaconChain, + network_send: UnboundedSender>, + log: &Logger, +) { + let head = chain.head_snapshot(); + let mut changes = chain + .op_pool + .get_bls_to_execution_changes_received_pre_capella(&head.beacon_state, &chain.spec); + + while !changes.is_empty() { + // This `split_off` approach is to allow us to have owned chunks of the + // `changes` vec. The `std::slice::Chunks` method uses references and + // the `itertools` iterator that achives this isn't `Send` so it doesn't + // work well with the `sleep` at the end of the loop. + let tail = changes.split_off(cmp::min(BROADCAST_CHUNK_SIZE, changes.len())); + let chunk = mem::replace(&mut changes, tail); + + let mut published_indices = HashSet::with_capacity(BROADCAST_CHUNK_SIZE); + let mut num_ok = 0; + let mut num_err = 0; + + // Publish each individual address change. + for address_change in chunk { + let validator_index = address_change.message.validator_index; + + let pubsub_message = PubsubMessage::BlsToExecutionChange(Box::new(address_change)); + let message = NetworkMessage::Publish { + messages: vec![pubsub_message], + }; + // It seems highly unlikely that this unbounded send will fail, but + // we handle the result nontheless. + if let Err(e) = network_send.send(message) { + debug!( + log, + "Failed to publish change message"; + "error" => ?e, + "validator_index" => validator_index + ); + num_err += 1; + } else { + debug!( + log, + "Published address change message"; + "validator_index" => validator_index + ); + num_ok += 1; + published_indices.insert(validator_index); + } + } + + // Remove any published indices from the list of indices that need to be + // published. + chain + .op_pool + .register_indices_broadcasted_at_capella(&published_indices); + + info!( + log, + "Published address change messages"; + "num_published" => num_ok, + ); + + if num_err > 0 { + warn!( + log, + "Failed to publish address changes"; + "info" => "failed messages will be retried", + "num_unable_to_publish" => num_err, + ); + } + + sleep(BROADCAST_CHUNK_DELAY).await; + } + + debug!( + log, + "Address change routine complete"; + ); +} + +#[cfg(not(debug_assertions))] // Tests run too slow in debug. +#[cfg(test)] +mod tests { + use super::*; + use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; + use operation_pool::ReceivedPreCapella; + use state_processing::{SigVerifiedOp, VerifyOperation}; + use std::collections::HashSet; + use tokio::sync::mpsc; + use types::*; + + type E = MainnetEthSpec; + + pub const VALIDATOR_COUNT: usize = BROADCAST_CHUNK_SIZE * 3; + pub const EXECUTION_ADDRESS: Address = Address::repeat_byte(42); + + struct Tester { + harness: BeaconChainHarness>, + /// Changes which should be broadcast at the Capella fork. + received_pre_capella_changes: Vec>, + /// Changes which should *not* be broadcast at the Capella fork. + not_received_pre_capella_changes: Vec>, + } + + impl Tester { + fn new() -> Self { + let altair_fork_epoch = Epoch::new(0); + let bellatrix_fork_epoch = Epoch::new(0); + let capella_fork_epoch = Epoch::new(2); + + let mut spec = E::default_spec(); + spec.altair_fork_epoch = Some(altair_fork_epoch); + spec.bellatrix_fork_epoch = Some(bellatrix_fork_epoch); + spec.capella_fork_epoch = Some(capella_fork_epoch); + + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec) + .logger(logging::test_logger()) + .deterministic_keypairs(VALIDATOR_COUNT) + .deterministic_withdrawal_keypairs(VALIDATOR_COUNT) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + Self { + harness, + received_pre_capella_changes: <_>::default(), + not_received_pre_capella_changes: <_>::default(), + } + } + + fn produce_verified_address_change( + &self, + validator_index: u64, + ) -> SigVerifiedOp { + let change = self + .harness + .make_bls_to_execution_change(validator_index, EXECUTION_ADDRESS); + let head = self.harness.chain.head_snapshot(); + + change + .validate(&head.beacon_state, &self.harness.spec) + .unwrap() + } + + fn produce_received_pre_capella_changes(mut self, indices: Vec) -> Self { + for validator_index in indices { + self.received_pre_capella_changes + .push(self.produce_verified_address_change(validator_index)); + } + self + } + + fn produce_not_received_pre_capella_changes(mut self, indices: Vec) -> Self { + for validator_index in indices { + self.not_received_pre_capella_changes + .push(self.produce_verified_address_change(validator_index)); + } + self + } + + async fn run(self) { + let harness = self.harness; + let chain = harness.chain.clone(); + + let mut broadcast_indices = HashSet::new(); + for change in self.received_pre_capella_changes { + broadcast_indices.insert(change.as_inner().message.validator_index); + chain + .op_pool + .insert_bls_to_execution_change(change, ReceivedPreCapella::Yes); + } + + let mut non_broadcast_indices = HashSet::new(); + for change in self.not_received_pre_capella_changes { + non_broadcast_indices.insert(change.as_inner().message.validator_index); + chain + .op_pool + .insert_bls_to_execution_change(change, ReceivedPreCapella::No); + } + + harness.set_current_slot( + chain + .spec + .capella_fork_epoch + .unwrap() + .start_slot(E::slots_per_epoch()), + ); + + let (sender, mut receiver) = mpsc::unbounded_channel(); + + broadcast_address_changes_at_capella(&chain, sender, &logging::test_logger()).await; + + let mut broadcasted_changes = vec![]; + while let Some(NetworkMessage::Publish { mut messages }) = receiver.recv().await { + match messages.pop().unwrap() { + PubsubMessage::BlsToExecutionChange(change) => broadcasted_changes.push(change), + _ => panic!("unexpected message"), + } + } + + assert_eq!( + broadcasted_changes.len(), + broadcast_indices.len(), + "all expected changes should have been broadcast" + ); + + for broadcasted in &broadcasted_changes { + assert!( + !non_broadcast_indices.contains(&broadcasted.message.validator_index), + "messages not flagged for broadcast should not have been broadcast" + ); + } + + let head = chain.head_snapshot(); + assert!( + chain + .op_pool + .get_bls_to_execution_changes_received_pre_capella( + &head.beacon_state, + &chain.spec, + ) + .is_empty(), + "there shouldn't be any capella broadcast changes left in the op pool" + ); + } + } + + // Useful for generating even-numbered indices. Required since only even + // numbered genesis validators have BLS credentials. + fn even_indices(start: u64, count: usize) -> Vec { + (start..).filter(|i| i % 2 == 0).take(count).collect() + } + + #[tokio::test] + async fn one_chunk() { + Tester::new() + .produce_received_pre_capella_changes(even_indices(0, 4)) + .produce_not_received_pre_capella_changes(even_indices(10, 4)) + .run() + .await; + } + + #[tokio::test] + async fn multiple_chunks() { + Tester::new() + .produce_received_pre_capella_changes(even_indices(0, BROADCAST_CHUNK_SIZE * 3 / 2)) + .run() + .await; + } +} diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 3b016ebda..5fa2fddc3 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -1,3 +1,4 @@ +use crate::address_change_broadcast::broadcast_address_changes_at_capella; use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use crate::Client; @@ -802,6 +803,25 @@ where // Spawns a routine that polls the `exchange_transition_configuration` endpoint. execution_layer.spawn_transition_configuration_poll(beacon_chain.spec.clone()); } + + // Spawn a service to publish BLS to execution changes at the Capella fork. + if let Some(network_senders) = self.network_senders { + let inner_chain = beacon_chain.clone(); + let broadcast_context = + runtime_context.service_context("addr_bcast".to_string()); + let log = broadcast_context.log().clone(); + broadcast_context.executor.spawn( + async move { + broadcast_address_changes_at_capella( + &inner_chain, + network_senders.network_send(), + &log, + ) + .await + }, + "addr_broadcast", + ); + } } start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone()); diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 24df87408..b0184dc0f 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -1,5 +1,6 @@ extern crate slog; +mod address_change_broadcast; pub mod config; mod metrics; mod notifier; diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 0dc918f42..5110a73ed 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -36,6 +36,7 @@ tree_hash = "0.4.1" sysinfo = "0.26.5" system_health = { path = "../../common/system_health" } directory = { path = "../../common/directory" } +operation_pool = { path = "../operation_pool" } [dev-dependencies] store = { path = "../store" } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index e0f8bcf2a..0edbaf8f7 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -35,6 +35,7 @@ use eth2::types::{ use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; use lighthouse_version::version_with_platform; use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage}; +use operation_pool::ReceivedPreCapella; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use slog::{crit, debug, error, info, warn, Logger}; @@ -1696,8 +1697,12 @@ pub fn serve( .to_execution_address; // New to P2P *and* op pool, gossip immediately if post-Capella. - let publish = chain.current_slot_is_post_capella().unwrap_or(false); - if publish { + let received_pre_capella = if chain.current_slot_is_post_capella().unwrap_or(false) { + ReceivedPreCapella::No + } else { + ReceivedPreCapella::Yes + }; + if matches!(received_pre_capella, ReceivedPreCapella::No) { publish_pubsub_message( &network_tx, PubsubMessage::BlsToExecutionChange(Box::new( @@ -1708,14 +1713,14 @@ pub fn serve( // Import to op pool (may return `false` if there's a race). let imported = - chain.import_bls_to_execution_change(verified_address_change); + chain.import_bls_to_execution_change(verified_address_change, received_pre_capella); info!( log, "Processed BLS to execution change"; "validator_index" => validator_index, "address" => ?address, - "published" => publish, + "published" => matches!(received_pre_capella, ReceivedPreCapella::No), "imported" => imported, ); } diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index e61470fe9..614412356 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -6,6 +6,7 @@ use beacon_chain::{ }; use eth2::types::{IndexedErrorMessage, StateId, SyncSubcommittee}; use genesis::{bls_withdrawal_credentials, interop_genesis_state_with_withdrawal_credentials}; +use std::collections::HashSet; use types::{ test_utils::{generate_deterministic_keypair, generate_deterministic_keypairs}, Address, ChainSpec, Epoch, EthSpec, Hash256, MinimalEthSpec, Slot, @@ -438,6 +439,8 @@ async fn bls_to_execution_changes_update_all_around_capella_fork() { .await .unwrap(); + let expected_received_pre_capella_messages = valid_address_changes[..num_pre_capella].to_vec(); + // Conflicting changes for the same validators should all fail. let error = client .post_beacon_pool_bls_to_execution_changes(&conflicting_address_changes[..num_pre_capella]) @@ -464,6 +467,20 @@ async fn bls_to_execution_changes_update_all_around_capella_fork() { harness.extend_to_slot(capella_slot - 1).await; assert_eq!(harness.head_slot(), capella_slot - 1); + assert_eq!( + harness + .chain + .op_pool + .get_bls_to_execution_changes_received_pre_capella( + &harness.chain.head_snapshot().beacon_state, + &spec, + ) + .into_iter() + .collect::>(), + HashSet::from_iter(expected_received_pre_capella_messages.into_iter()), + "all pre-capella messages should be queued for capella broadcast" + ); + // Add Capella blocks which should be full of BLS to execution changes. for i in 0..validator_count / max_bls_to_execution_changes { let head_block_root = harness.extend_slots(1).await; diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index b1d928eec..5ce331169 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -45,6 +45,7 @@ tokio-util = { version = "0.6.3", features = ["time"] } derivative = "2.2.0" delay_map = "0.1.1" ethereum-types = { version = "0.14.1", optional = true } +operation_pool = { path = "../operation_pool" } [features] deterministic_long_lived_attnets = [ "ethereum-types" ] diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 44d611189..f2b1b3a26 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -12,6 +12,7 @@ use beacon_chain::{ GossipVerifiedBlock, NotifyExecutionLayer, }; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; +use operation_pool::ReceivedPreCapella; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use ssz::Encode; @@ -1251,7 +1252,12 @@ impl Worker { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); - self.chain.import_bls_to_execution_change(change); + // Address change messages from gossip are only processed *after* the + // Capella fork epoch. + let received_pre_capella = ReceivedPreCapella::No; + + self.chain + .import_bls_to_execution_change(change, received_pre_capella); debug!( self.log, diff --git a/beacon_node/operation_pool/Cargo.toml b/beacon_node/operation_pool/Cargo.toml index 848323358..cc4eacde8 100644 --- a/beacon_node/operation_pool/Cargo.toml +++ b/beacon_node/operation_pool/Cargo.toml @@ -19,6 +19,7 @@ serde = "1.0.116" serde_derive = "1.0.116" store = { path = "../store" } bitvec = "1" +rand = "0.8.5" [dev-dependencies] beacon_chain = { path = "../beacon_chain" } diff --git a/beacon_node/operation_pool/src/bls_to_execution_changes.rs b/beacon_node/operation_pool/src/bls_to_execution_changes.rs index 84513d466..c73666e14 100644 --- a/beacon_node/operation_pool/src/bls_to_execution_changes.rs +++ b/beacon_node/operation_pool/src/bls_to_execution_changes.rs @@ -1,11 +1,20 @@ use state_processing::SigVerifiedOp; -use std::collections::{hash_map::Entry, HashMap}; +use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::sync::Arc; use types::{ AbstractExecPayload, BeaconState, ChainSpec, EthSpec, SignedBeaconBlock, SignedBlsToExecutionChange, }; +/// Indicates if a `BlsToExecutionChange` was received before or after the +/// Capella fork. This is used to know which messages we should broadcast at the +/// Capella fork epoch. +#[derive(Copy, Clone)] +pub enum ReceivedPreCapella { + Yes, + No, +} + /// Pool of BLS to execution changes that maintains a LIFO queue and an index by validator. /// /// Using the LIFO queue for block production disincentivises spam on P2P at the Capella fork, @@ -16,6 +25,9 @@ pub struct BlsToExecutionChanges { by_validator_index: HashMap>>, /// Last-in-first-out (LIFO) queue of verified messages. queue: Vec>>, + /// Contains a set of validator indices which need to have their changes + /// broadcast at the capella epoch. + received_pre_capella_indices: HashSet, } impl BlsToExecutionChanges { @@ -31,16 +43,18 @@ impl BlsToExecutionChanges { pub fn insert( &mut self, verified_change: SigVerifiedOp, + received_pre_capella: ReceivedPreCapella, ) -> bool { + let validator_index = verified_change.as_inner().message.validator_index; // Wrap in an `Arc` once on insert. let verified_change = Arc::new(verified_change); - match self - .by_validator_index - .entry(verified_change.as_inner().message.validator_index) - { + match self.by_validator_index.entry(validator_index) { Entry::Vacant(entry) => { self.queue.push(verified_change.clone()); entry.insert(verified_change); + if matches!(received_pre_capella, ReceivedPreCapella::Yes) { + self.received_pre_capella_indices.insert(validator_index); + } true } Entry::Occupied(_) => false, @@ -61,6 +75,24 @@ impl BlsToExecutionChanges { self.queue.iter().rev() } + /// Returns only those which are flagged for broadcasting at the Capella + /// fork. Uses FIFO ordering, although we expect this list to be shuffled by + /// the caller. + pub fn iter_received_pre_capella( + &self, + ) -> impl Iterator>> { + self.queue.iter().filter(|address_change| { + self.received_pre_capella_indices + .contains(&address_change.as_inner().message.validator_index) + }) + } + + /// Returns the set of indicies which should have their address changes + /// broadcast at the Capella fork. + pub fn iter_pre_capella_indices(&self) -> impl Iterator { + self.received_pre_capella_indices.iter() + } + /// Prune BLS to execution changes that have been applied to the state more than 1 block ago. /// /// The block check is necessary to avoid pruning too eagerly and losing the ability to include @@ -102,4 +134,14 @@ impl BlsToExecutionChanges { self.by_validator_index.remove(&validator_index); } } + + /// Removes `broadcasted` validators from the set of validators that should + /// have their BLS changes broadcast at the Capella fork boundary. + pub fn register_indices_broadcasted_at_capella(&mut self, broadcasted: &HashSet) { + self.received_pre_capella_indices = self + .received_pre_capella_indices + .difference(broadcasted) + .copied() + .collect(); + } } diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 4643addad..d401deb89 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -9,12 +9,13 @@ mod persistence; mod reward_cache; mod sync_aggregate_id; +pub use crate::bls_to_execution_changes::ReceivedPreCapella; pub use attestation::AttMaxCover; pub use attestation_storage::{AttestationRef, SplitAttestation}; pub use max_cover::MaxCover; pub use persistence::{ PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV14, - PersistedOperationPoolV5, + PersistedOperationPoolV15, PersistedOperationPoolV5, }; pub use reward_cache::RewardCache; @@ -24,6 +25,8 @@ use crate::sync_aggregate_id::SyncAggregateId; use attester_slashing::AttesterSlashingMaxCover; use max_cover::maximum_cover; use parking_lot::{RwLock, RwLockWriteGuard}; +use rand::seq::SliceRandom; +use rand::thread_rng; use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::per_block_processing::{ get_slashable_indices_modular, verify_exit, VerifySignatures, @@ -533,10 +536,11 @@ impl OperationPool { pub fn insert_bls_to_execution_change( &self, verified_change: SigVerifiedOp, + received_pre_capella: ReceivedPreCapella, ) -> bool { self.bls_to_execution_changes .write() - .insert(verified_change) + .insert(verified_change, received_pre_capella) } /// Get a list of execution changes for inclusion in a block. @@ -562,6 +566,42 @@ impl OperationPool { ) } + /// Get a list of execution changes to be broadcast at the Capella fork. + /// + /// The list that is returned will be shuffled to help provide a fair + /// broadcast of messages. + pub fn get_bls_to_execution_changes_received_pre_capella( + &self, + state: &BeaconState, + spec: &ChainSpec, + ) -> Vec { + let mut changes = filter_limit_operations( + self.bls_to_execution_changes + .read() + .iter_received_pre_capella(), + |address_change| { + address_change.signature_is_still_valid(&state.fork()) + && state + .get_validator(address_change.as_inner().message.validator_index as usize) + .map_or(false, |validator| { + !validator.has_eth1_withdrawal_credential(spec) + }) + }, + |address_change| address_change.as_inner().clone(), + usize::max_value(), + ); + changes.shuffle(&mut thread_rng()); + changes + } + + /// Removes `broadcasted` validators from the set of validators that should + /// have their BLS changes broadcast at the Capella fork boundary. + pub fn register_indices_broadcasted_at_capella(&self, broadcasted: &HashSet) { + self.bls_to_execution_changes + .write() + .register_indices_broadcasted_at_capella(broadcasted); + } + /// Prune BLS to execution changes that have been applied to the state more than 1 block ago. pub fn prune_bls_to_execution_changes>( &self, diff --git a/beacon_node/operation_pool/src/persistence.rs b/beacon_node/operation_pool/src/persistence.rs index 4948040ae..65354e01a 100644 --- a/beacon_node/operation_pool/src/persistence.rs +++ b/beacon_node/operation_pool/src/persistence.rs @@ -1,6 +1,6 @@ use crate::attestation_id::AttestationId; use crate::attestation_storage::AttestationMap; -use crate::bls_to_execution_changes::BlsToExecutionChanges; +use crate::bls_to_execution_changes::{BlsToExecutionChanges, ReceivedPreCapella}; use crate::sync_aggregate_id::SyncAggregateId; use crate::OpPoolError; use crate::OperationPool; @@ -9,6 +9,8 @@ use parking_lot::RwLock; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use state_processing::SigVerifiedOp; +use std::collections::HashSet; +use std::mem; use store::{DBColumn, Error as StoreError, StoreItem}; use types::*; @@ -19,7 +21,7 @@ type PersistedSyncContributions = Vec<(SyncAggregateId, Vec { #[superstruct(only(V5))] pub attestations_v5: Vec<(AttestationId, Vec>)>, /// Attestations and their attesting indices. - #[superstruct(only(V12, V14))] + #[superstruct(only(V12, V14, V15))] pub attestations: Vec<(Attestation, Vec)>, /// Mapping from sync contribution ID to sync contributions and aggregate. pub sync_contributions: PersistedSyncContributions, @@ -41,23 +43,27 @@ pub struct PersistedOperationPool { #[superstruct(only(V5))] pub attester_slashings_v5: Vec<(AttesterSlashing, ForkVersion)>, /// Attester slashings. - #[superstruct(only(V12, V14))] + #[superstruct(only(V12, V14, V15))] pub attester_slashings: Vec, T>>, /// [DEPRECATED] Proposer slashings. #[superstruct(only(V5))] pub proposer_slashings_v5: Vec, /// Proposer slashings with fork information. - #[superstruct(only(V12, V14))] + #[superstruct(only(V12, V14, V15))] pub proposer_slashings: Vec>, /// [DEPRECATED] Voluntary exits. #[superstruct(only(V5))] pub voluntary_exits_v5: Vec, /// Voluntary exits with fork information. - #[superstruct(only(V12, V14))] + #[superstruct(only(V12, V14, V15))] pub voluntary_exits: Vec>, /// BLS to Execution Changes - #[superstruct(only(V14))] + #[superstruct(only(V14, V15))] pub bls_to_execution_changes: Vec>, + /// Validator indices with BLS to Execution Changes to be broadcast at the + /// Capella fork. + #[superstruct(only(V15))] + pub capella_bls_change_broadcast_indices: Vec, } impl PersistedOperationPool { @@ -110,18 +116,26 @@ impl PersistedOperationPool { .map(|bls_to_execution_change| (**bls_to_execution_change).clone()) .collect(); - PersistedOperationPool::V14(PersistedOperationPoolV14 { + let capella_bls_change_broadcast_indices = operation_pool + .bls_to_execution_changes + .read() + .iter_pre_capella_indices() + .copied() + .collect(); + + PersistedOperationPool::V15(PersistedOperationPoolV15 { attestations, sync_contributions, attester_slashings, proposer_slashings, voluntary_exits, bls_to_execution_changes, + capella_bls_change_broadcast_indices, }) } /// Reconstruct an `OperationPool`. - pub fn into_operation_pool(self) -> Result, OpPoolError> { + pub fn into_operation_pool(mut self) -> Result, OpPoolError> { let attester_slashings = RwLock::new(self.attester_slashings()?.iter().cloned().collect()); let proposer_slashings = RwLock::new( self.proposer_slashings()? @@ -142,33 +156,43 @@ impl PersistedOperationPool { PersistedOperationPool::V5(_) | PersistedOperationPool::V12(_) => { return Err(OpPoolError::IncorrectOpPoolVariant) } - PersistedOperationPool::V14(ref pool) => { + PersistedOperationPool::V14(_) | PersistedOperationPool::V15(_) => { let mut map = AttestationMap::default(); - for (att, attesting_indices) in pool.attestations.clone() { + for (att, attesting_indices) in self.attestations()?.clone() { map.insert(att, attesting_indices); } RwLock::new(map) } }; - let bls_to_execution_changes = match self { - PersistedOperationPool::V5(_) | PersistedOperationPool::V12(_) => { - return Err(OpPoolError::IncorrectOpPoolVariant) + let mut bls_to_execution_changes = BlsToExecutionChanges::default(); + if let Ok(persisted_changes) = self.bls_to_execution_changes_mut() { + let persisted_changes = mem::take(persisted_changes); + + let broadcast_indices = + if let Ok(indices) = self.capella_bls_change_broadcast_indices_mut() { + mem::take(indices).into_iter().collect() + } else { + HashSet::new() + }; + + for bls_to_execution_change in persisted_changes { + let received_pre_capella = if broadcast_indices + .contains(&bls_to_execution_change.as_inner().message.validator_index) + { + ReceivedPreCapella::Yes + } else { + ReceivedPreCapella::No + }; + bls_to_execution_changes.insert(bls_to_execution_change, received_pre_capella); } - PersistedOperationPool::V14(pool) => { - let mut bls_to_execution_changes = BlsToExecutionChanges::default(); - for bls_to_execution_change in pool.bls_to_execution_changes { - bls_to_execution_changes.insert(bls_to_execution_change); - } - RwLock::new(bls_to_execution_changes) - } - }; + } let op_pool = OperationPool { attestations, sync_contributions, attester_slashings, proposer_slashings, voluntary_exits, - bls_to_execution_changes, + bls_to_execution_changes: RwLock::new(bls_to_execution_changes), reward_cache: Default::default(), _phantom: Default::default(), }; @@ -204,6 +228,20 @@ impl StoreItem for PersistedOperationPoolV12 { } } +impl StoreItem for PersistedOperationPoolV14 { + fn db_column() -> DBColumn { + DBColumn::OpPool + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + PersistedOperationPoolV14::from_ssz_bytes(bytes).map_err(Into::into) + } +} + /// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::V12`. impl StoreItem for PersistedOperationPool { fn db_column() -> DBColumn { @@ -216,8 +254,8 @@ impl StoreItem for PersistedOperationPool { fn from_store_bytes(bytes: &[u8]) -> Result { // Default deserialization to the latest variant. - PersistedOperationPoolV14::from_ssz_bytes(bytes) - .map(Self::V14) + PersistedOperationPoolV15::from_ssz_bytes(bytes) + .map(Self::V15) .map_err(Into::into) } } diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index fb5769635..729b36ff2 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(14); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(15); // All the keys that get stored under the `BeaconMeta` column. // diff --git a/consensus/types/src/bls_to_execution_change.rs b/consensus/types/src/bls_to_execution_change.rs index cb73e43f9..b279515bd 100644 --- a/consensus/types/src/bls_to_execution_change.rs +++ b/consensus/types/src/bls_to_execution_change.rs @@ -10,6 +10,7 @@ use tree_hash_derive::TreeHash; arbitrary::Arbitrary, Debug, PartialEq, + Eq, Hash, Clone, Serialize, diff --git a/consensus/types/src/signed_bls_to_execution_change.rs b/consensus/types/src/signed_bls_to_execution_change.rs index 92b79fad3..2b17095ae 100644 --- a/consensus/types/src/signed_bls_to_execution_change.rs +++ b/consensus/types/src/signed_bls_to_execution_change.rs @@ -10,6 +10,7 @@ use tree_hash_derive::TreeHash; arbitrary::Arbitrary, Debug, PartialEq, + Eq, Hash, Clone, Serialize,