diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 17f58b223..24ea07833 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -160,6 +160,7 @@ pub enum BeaconChainError { BlockRewardSlotError, BlockRewardAttestationError, BlockRewardSyncError, + SyncCommitteeRewardsSyncError, HeadMissingFromForkChoice(Hash256), FinalizedBlockMissingFromForkChoice(Hash256), HeadBlockMissingFromForkChoice(Hash256), diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index ae1c5e4b7..ae3e98f91 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -40,6 +40,7 @@ pub mod schema_change; mod shuffling_cache; mod snapshot_cache; pub mod state_advance_timer; +pub mod sync_committee_rewards; pub mod sync_committee_verification; pub mod test_utils; mod timeout_rw_lock; diff --git a/beacon_node/beacon_chain/src/sync_committee_rewards.rs b/beacon_node/beacon_chain/src/sync_committee_rewards.rs new file mode 100644 index 000000000..561fed1a8 --- /dev/null +++ b/beacon_node/beacon_chain/src/sync_committee_rewards.rs @@ -0,0 +1,87 @@ +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; + +use eth2::lighthouse::SyncCommitteeReward; +use safe_arith::SafeArith; +use slog::error; +use state_processing::per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards; +use std::collections::HashMap; +use store::RelativeEpoch; +use types::{BeaconBlockRef, BeaconState, ExecPayload}; + +impl BeaconChain { + pub fn compute_sync_committee_rewards>( + &self, + block: BeaconBlockRef<'_, T::EthSpec, Payload>, + state: &mut BeaconState, + ) -> Result, BeaconChainError> { + if block.slot() != state.slot() { + return Err(BeaconChainError::BlockRewardSlotError); + } + + let spec = &self.spec; + + state.build_committee_cache(RelativeEpoch::Current, spec)?; + + let sync_aggregate = block.body().sync_aggregate()?; + + let sync_committee = state.current_sync_committee()?.clone(); + + let sync_committee_indices = state.get_sync_committee_indices(&sync_committee)?; + + let (participant_reward_value, proposer_reward_per_bit) = + compute_sync_aggregate_rewards(state, spec).map_err(|e| { + error!( + self.log, "Error calculating sync aggregate rewards"; + "error" => ?e + ); + BeaconChainError::SyncCommitteeRewardsSyncError + })?; + + let mut balances = HashMap::::new(); + + let mut total_proposer_rewards = 0; + let proposer_index = state.get_beacon_proposer_index(block.slot(), spec)?; + + // Apply rewards to participant balances. Keep track of proposer rewards + for (validator_index, participant_bit) in sync_committee_indices + .iter() + .zip(sync_aggregate.sync_committee_bits.iter()) + { + let participant_balance = balances + .entry(*validator_index) + .or_insert_with(|| state.balances()[*validator_index]); + + if participant_bit { + participant_balance.safe_add_assign(participant_reward_value)?; + + balances + .entry(proposer_index) + .or_insert_with(|| state.balances()[proposer_index]) + .safe_add_assign(proposer_reward_per_bit)?; + + total_proposer_rewards.safe_add_assign(proposer_reward_per_bit)?; + } else { + *participant_balance = participant_balance.saturating_sub(participant_reward_value); + } + } + + Ok(balances + .iter() + .filter_map(|(i, new_balance)| { + let reward = if *i != proposer_index { + *new_balance as i64 - state.balances()[*i] as i64 + } else if sync_committee_indices.contains(i) { + *new_balance as i64 + - state.balances()[*i] as i64 + - total_proposer_rewards as i64 + } else { + return None; + }; + Some(SyncCommitteeReward { + validator_index: *i as u64, + reward, + }) + }) + .collect()) + } +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 66de3f02d..749487dc5 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2,6 +2,7 @@ pub use crate::persisted_beacon_chain::PersistedBeaconChain; pub use crate::{ beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY}, migrate::MigratorConfig, + sync_committee_verification::Error as SyncCommitteeError, validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, BeaconChainError, NotifyExecutionLayer, ProduceBlockVerification, }; @@ -1980,6 +1981,30 @@ where (honest_head, faulty_head) } + + pub fn process_sync_contributions( + &self, + sync_contributions: HarnessSyncContributions, + ) -> Result<(), SyncCommitteeError> { + let mut verified_contributions = Vec::with_capacity(sync_contributions.len()); + + for (_, contribution_and_proof) in sync_contributions { + let signed_contribution_and_proof = contribution_and_proof.unwrap(); + + let verified_contribution = self + .chain + .verify_sync_contribution_for_gossip(signed_contribution_and_proof)?; + + verified_contributions.push(verified_contribution); + } + + for verified_contribution in verified_contributions { + self.chain + .add_contribution_to_block_inclusion_pool(verified_contribution)?; + } + + Ok(()) + } } // Junk `Debug` impl to satistfy certain trait bounds during testing. diff --git a/beacon_node/beacon_chain/tests/main.rs b/beacon_node/beacon_chain/tests/main.rs index 1c61e9927..eceb4f2e8 100644 --- a/beacon_node/beacon_chain/tests/main.rs +++ b/beacon_node/beacon_chain/tests/main.rs @@ -4,6 +4,7 @@ mod block_verification; mod merge; mod op_verification; mod payload_invalidation; +mod rewards; mod store_tests; mod sync_committee_verification; mod tests; diff --git a/beacon_node/beacon_chain/tests/rewards.rs b/beacon_node/beacon_chain/tests/rewards.rs new file mode 100644 index 000000000..b61bea124 --- /dev/null +++ b/beacon_node/beacon_chain/tests/rewards.rs @@ -0,0 +1,121 @@ +#![cfg(test)] + +use std::collections::HashMap; + +use beacon_chain::test_utils::{ + generate_deterministic_keypairs, BeaconChainHarness, EphemeralHarnessType, +}; +use beacon_chain::{ + test_utils::{AttestationStrategy, BlockStrategy, RelativeSyncCommittee}, + types::{Epoch, EthSpec, Keypair, MinimalEthSpec}, +}; +use lazy_static::lazy_static; + +pub const VALIDATOR_COUNT: usize = 64; + +lazy_static! { + static ref KEYPAIRS: Vec = generate_deterministic_keypairs(VALIDATOR_COUNT); +} + +fn get_harness() -> BeaconChainHarness> { + let mut spec = E::default_spec(); + + spec.altair_fork_epoch = Some(Epoch::new(0)); // We use altair for all tests + + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec) + .keypairs(KEYPAIRS.to_vec()) + .fresh_ephemeral_store() + .build(); + + harness.advance_slot(); + + harness +} + +#[tokio::test] +async fn test_sync_committee_rewards() { + let num_block_produced = MinimalEthSpec::slots_per_epoch(); + let harness = get_harness::(); + + let latest_block_root = harness + .extend_chain( + num_block_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Create and add sync committee message to op_pool + let sync_contributions = harness.make_sync_contributions( + &harness.get_current_state(), + latest_block_root, + harness.get_current_slot(), + RelativeSyncCommittee::Current, + ); + + harness + .process_sync_contributions(sync_contributions) + .unwrap(); + + // Add block + let chain = &harness.chain; + let (head_state, head_state_root) = harness.get_current_state_and_root(); + let target_slot = harness.get_current_slot() + 1; + + let (block_root, mut state) = harness + .add_attested_block_at_slot(target_slot, head_state, head_state_root, &[]) + .await + .unwrap(); + + let block = harness.get_block(block_root).unwrap(); + let parent_block = chain + .get_blinded_block(&block.parent_root()) + .unwrap() + .unwrap(); + let parent_state = chain + .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .unwrap() + .unwrap(); + + let reward_payload = chain + .compute_sync_committee_rewards(block.message(), &mut state) + .unwrap(); + + let rewards = reward_payload + .iter() + .map(|reward| (reward.validator_index, reward.reward)) + .collect::>(); + + let proposer_index = state + .get_beacon_proposer_index(target_slot, &MinimalEthSpec::default_spec()) + .unwrap(); + + let mut mismatches = vec![]; + + for validator in state.validators() { + let validator_index = state + .clone() + .get_validator_index(&validator.pubkey) + .unwrap() + .unwrap(); + let pre_state_balance = parent_state.balances()[validator_index]; + let post_state_balance = state.balances()[validator_index]; + let sync_committee_reward = rewards.get(&(validator_index as u64)).unwrap_or(&0); + + if validator_index == proposer_index { + continue; // Ignore proposer + } + + if pre_state_balance as i64 + *sync_committee_reward != post_state_balance as i64 { + mismatches.push(validator_index.to_string()); + } + } + + assert_eq!( + mismatches.len(), + 0, + "Expect 0 mismatches, but these validators have mismatches on balance: {} ", + mismatches.join(",") + ); +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 8cd0b856b..1399bb99a 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -16,6 +16,7 @@ mod metrics; mod proposer_duties; mod publish_blocks; mod state_id; +mod sync_committee_rewards; mod sync_committees; mod ui; mod validator_inclusion; @@ -1699,6 +1700,41 @@ pub fn serve( }, ); + /* + * beacon/rewards + */ + + let beacon_rewards_path = eth_v1 + .and(warp::path("beacon")) + .and(warp::path("rewards")) + .and(chain_filter.clone()); + + // POST beacon/rewards/sync_committee/{block_id} + let post_beacon_rewards_sync_committee = beacon_rewards_path + .clone() + .and(warp::path("sync_committee")) + .and(block_id_or_err) + .and(warp::path::end()) + .and(warp::body::json()) + .and(log_filter.clone()) + .and_then( + |chain: Arc>, + block_id: BlockId, + validators: Vec, + log: Logger| { + blocking_json_task(move || { + let (rewards, execution_optimistic) = + sync_committee_rewards::compute_sync_committee_rewards( + chain, block_id, validators, log, + )?; + + Ok(rewards) + .map(api_types::GenericResponse::from) + .map(|resp| resp.add_execution_optimistic(execution_optimistic)) + }) + }, + ); + /* * config */ @@ -3396,6 +3432,7 @@ pub fn serve( .or(post_beacon_pool_proposer_slashings.boxed()) .or(post_beacon_pool_voluntary_exits.boxed()) .or(post_beacon_pool_sync_committees.boxed()) + .or(post_beacon_rewards_sync_committee.boxed()) .or(post_validator_duties_attester.boxed()) .or(post_validator_duties_sync.boxed()) .or(post_validator_aggregate_and_proofs.boxed()) diff --git a/beacon_node/http_api/src/sync_committee_rewards.rs b/beacon_node/http_api/src/sync_committee_rewards.rs new file mode 100644 index 000000000..ae369115d --- /dev/null +++ b/beacon_node/http_api/src/sync_committee_rewards.rs @@ -0,0 +1,77 @@ +use crate::{BlockId, ExecutionOptimistic}; +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use eth2::lighthouse::SyncCommitteeReward; +use eth2::types::ValidatorId; +use slog::{debug, Logger}; +use state_processing::BlockReplayer; +use std::sync::Arc; +use types::{BeaconState, SignedBlindedBeaconBlock}; +use warp_utils::reject::{beacon_chain_error, custom_not_found}; + +pub fn compute_sync_committee_rewards( + chain: Arc>, + block_id: BlockId, + validators: Vec, + log: Logger, +) -> Result<(Option>, ExecutionOptimistic), warp::Rejection> { + let (block, execution_optimistic) = block_id.blinded_block(&chain)?; + + let mut state = get_state_before_applying_block(chain.clone(), &block)?; + + let reward_payload = chain + .compute_sync_committee_rewards(block.message(), &mut state) + .map_err(beacon_chain_error)?; + + let data = if reward_payload.is_empty() { + debug!(log, "compute_sync_committee_rewards returned empty"); + None + } else if validators.is_empty() { + Some(reward_payload) + } else { + Some( + reward_payload + .into_iter() + .filter(|reward| { + validators.iter().any(|validator| match validator { + ValidatorId::Index(i) => reward.validator_index == *i, + ValidatorId::PublicKey(pubkey) => match state.get_validator_index(pubkey) { + Ok(Some(i)) => reward.validator_index == i as u64, + _ => false, + }, + }) + }) + .collect::>(), + ) + }; + + Ok((data, execution_optimistic)) +} + +fn get_state_before_applying_block( + chain: Arc>, + block: &SignedBlindedBeaconBlock, +) -> Result, warp::reject::Rejection> { + let parent_block: SignedBlindedBeaconBlock = chain + .get_blinded_block(&block.parent_root()) + .and_then(|maybe_block| { + maybe_block.ok_or_else(|| BeaconChainError::MissingBeaconBlock(block.parent_root())) + }) + .map_err(|e| custom_not_found(format!("Parent block is not available! {:?}", e)))?; + + let parent_state = chain + .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .and_then(|maybe_state| { + maybe_state + .ok_or_else(|| BeaconChainError::MissingBeaconState(parent_block.state_root())) + }) + .map_err(|e| custom_not_found(format!("Parent state is not available! {:?}", e)))?; + + let replayer = BlockReplayer::new(parent_state, &chain.spec) + .no_signature_verification() + .state_root_iter([Ok((parent_block.state_root(), parent_block.slot()))].into_iter()) + .minimal_block_root_verification() + .apply_blocks(vec![], Some(block.slot())) + .map_err(beacon_chain_error)?; + + Ok(replayer.into_state()) +} diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 58b4c88b3..00b664446 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -1026,6 +1026,24 @@ impl BeaconNodeHttpClient { .transpose() } + /// `POST beacon/rewards/sync_committee` + pub async fn post_beacon_rewards_sync_committee( + &self, + rewards: &[Option>], + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("rewards") + .push("sync_committee"); + + self.post(path, &rewards).await?; + + Ok(()) + } + /// `POST validator/contribution_and_proofs` pub async fn post_validator_contribution_and_proofs( &self, diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 2dced1c44..068abd693 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -3,6 +3,7 @@ mod attestation_performance; mod block_packing_efficiency; mod block_rewards; +mod sync_committee_rewards; use crate::{ ok_or_error, @@ -27,6 +28,7 @@ pub use block_packing_efficiency::{ }; pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery}; pub use lighthouse_network::{types::SyncState, PeerInfo}; +pub use sync_committee_rewards::SyncCommitteeReward; // Define "legacy" implementations of `Option` which use four bytes for encoding the union // selector. diff --git a/common/eth2/src/lighthouse/sync_committee_rewards.rs b/common/eth2/src/lighthouse/sync_committee_rewards.rs new file mode 100644 index 000000000..cdd685065 --- /dev/null +++ b/common/eth2/src/lighthouse/sync_committee_rewards.rs @@ -0,0 +1,12 @@ +use serde::{Deserialize, Serialize}; + +// Details about the rewards paid to sync committee members for attesting headers +// All rewards in GWei + +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +pub struct SyncCommitteeReward { + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub validator_index: u64, + // sync committee reward in gwei for the validator + pub reward: i64, +}