Merge remote-tracking branch 'origin/unstable' into capella
This commit is contained in:
commit
c76a1971cc
@ -162,6 +162,7 @@ pub enum BeaconChainError {
|
|||||||
BlockRewardSlotError,
|
BlockRewardSlotError,
|
||||||
BlockRewardAttestationError,
|
BlockRewardAttestationError,
|
||||||
BlockRewardSyncError,
|
BlockRewardSyncError,
|
||||||
|
SyncCommitteeRewardsSyncError,
|
||||||
HeadMissingFromForkChoice(Hash256),
|
HeadMissingFromForkChoice(Hash256),
|
||||||
FinalizedBlockMissingFromForkChoice(Hash256),
|
FinalizedBlockMissingFromForkChoice(Hash256),
|
||||||
HeadBlockMissingFromForkChoice(Hash256),
|
HeadBlockMissingFromForkChoice(Hash256),
|
||||||
|
@ -41,6 +41,7 @@ pub mod schema_change;
|
|||||||
mod shuffling_cache;
|
mod shuffling_cache;
|
||||||
mod snapshot_cache;
|
mod snapshot_cache;
|
||||||
pub mod state_advance_timer;
|
pub mod state_advance_timer;
|
||||||
|
pub mod sync_committee_rewards;
|
||||||
pub mod sync_committee_verification;
|
pub mod sync_committee_verification;
|
||||||
pub mod test_utils;
|
pub mod test_utils;
|
||||||
mod timeout_rw_lock;
|
mod timeout_rw_lock;
|
||||||
|
@ -2,6 +2,7 @@ use crate::{
|
|||||||
beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY, BeaconChain, BeaconChainError, BeaconChainTypes,
|
beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY, BeaconChain, BeaconChainError, BeaconChainTypes,
|
||||||
};
|
};
|
||||||
use derivative::Derivative;
|
use derivative::Derivative;
|
||||||
|
use eth2::types::Hash256;
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use strum::AsRefStr;
|
use strum::AsRefStr;
|
||||||
@ -36,6 +37,8 @@ pub enum Error {
|
|||||||
SigSlotStartIsNone,
|
SigSlotStartIsNone,
|
||||||
/// Failed to construct a LightClientOptimisticUpdate from state.
|
/// Failed to construct a LightClientOptimisticUpdate from state.
|
||||||
FailedConstructingUpdate,
|
FailedConstructingUpdate,
|
||||||
|
/// Unknown block with parent root.
|
||||||
|
UnknownBlockParentRoot(Hash256),
|
||||||
/// Beacon chain error occured.
|
/// Beacon chain error occured.
|
||||||
BeaconChainError(BeaconChainError),
|
BeaconChainError(BeaconChainError),
|
||||||
LightClientUpdateError(LightClientUpdateError),
|
LightClientUpdateError(LightClientUpdateError),
|
||||||
@ -58,6 +61,7 @@ impl From<LightClientUpdateError> for Error {
|
|||||||
#[derivative(Clone(bound = "T: BeaconChainTypes"))]
|
#[derivative(Clone(bound = "T: BeaconChainTypes"))]
|
||||||
pub struct VerifiedLightClientOptimisticUpdate<T: BeaconChainTypes> {
|
pub struct VerifiedLightClientOptimisticUpdate<T: BeaconChainTypes> {
|
||||||
light_client_optimistic_update: LightClientOptimisticUpdate<T::EthSpec>,
|
light_client_optimistic_update: LightClientOptimisticUpdate<T::EthSpec>,
|
||||||
|
pub parent_root: Hash256,
|
||||||
seen_timestamp: Duration,
|
seen_timestamp: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,6 +111,16 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
|
|||||||
None => return Err(Error::SigSlotStartIsNone),
|
None => return Err(Error::SigSlotStartIsNone),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if we can process the optimistic update immediately
|
||||||
|
// otherwise queue
|
||||||
|
let canonical_root = light_client_optimistic_update
|
||||||
|
.attested_header
|
||||||
|
.canonical_root();
|
||||||
|
|
||||||
|
if canonical_root != head_block.message().parent_root() {
|
||||||
|
return Err(Error::UnknownBlockParentRoot(canonical_root));
|
||||||
|
}
|
||||||
|
|
||||||
let optimistic_update =
|
let optimistic_update =
|
||||||
LightClientOptimisticUpdate::new(&chain.spec, head_block, &attested_state)?;
|
LightClientOptimisticUpdate::new(&chain.spec, head_block, &attested_state)?;
|
||||||
|
|
||||||
@ -119,6 +133,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
|
|||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
light_client_optimistic_update,
|
light_client_optimistic_update,
|
||||||
|
parent_root: canonical_root,
|
||||||
seen_timestamp,
|
seen_timestamp,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
87
beacon_node/beacon_chain/src/sync_committee_rewards.rs
Normal file
87
beacon_node/beacon_chain/src/sync_committee_rewards.rs
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||||
|
|
||||||
|
use eth2::lighthouse::SyncCommitteeReward;
|
||||||
|
use safe_arith::SafeArith;
|
||||||
|
use slog::error;
|
||||||
|
use state_processing::per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use store::RelativeEpoch;
|
||||||
|
use types::{BeaconBlockRef, BeaconState, ExecPayload};
|
||||||
|
|
||||||
|
impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||||
|
pub fn compute_sync_committee_rewards<Payload: ExecPayload<T::EthSpec>>(
|
||||||
|
&self,
|
||||||
|
block: BeaconBlockRef<'_, T::EthSpec, Payload>,
|
||||||
|
state: &mut BeaconState<T::EthSpec>,
|
||||||
|
) -> Result<Vec<SyncCommitteeReward>, BeaconChainError> {
|
||||||
|
if block.slot() != state.slot() {
|
||||||
|
return Err(BeaconChainError::BlockRewardSlotError);
|
||||||
|
}
|
||||||
|
|
||||||
|
let spec = &self.spec;
|
||||||
|
|
||||||
|
state.build_committee_cache(RelativeEpoch::Current, spec)?;
|
||||||
|
|
||||||
|
let sync_aggregate = block.body().sync_aggregate()?;
|
||||||
|
|
||||||
|
let sync_committee = state.current_sync_committee()?.clone();
|
||||||
|
|
||||||
|
let sync_committee_indices = state.get_sync_committee_indices(&sync_committee)?;
|
||||||
|
|
||||||
|
let (participant_reward_value, proposer_reward_per_bit) =
|
||||||
|
compute_sync_aggregate_rewards(state, spec).map_err(|e| {
|
||||||
|
error!(
|
||||||
|
self.log, "Error calculating sync aggregate rewards";
|
||||||
|
"error" => ?e
|
||||||
|
);
|
||||||
|
BeaconChainError::SyncCommitteeRewardsSyncError
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let mut balances = HashMap::<usize, u64>::new();
|
||||||
|
|
||||||
|
let mut total_proposer_rewards = 0;
|
||||||
|
let proposer_index = state.get_beacon_proposer_index(block.slot(), spec)?;
|
||||||
|
|
||||||
|
// Apply rewards to participant balances. Keep track of proposer rewards
|
||||||
|
for (validator_index, participant_bit) in sync_committee_indices
|
||||||
|
.iter()
|
||||||
|
.zip(sync_aggregate.sync_committee_bits.iter())
|
||||||
|
{
|
||||||
|
let participant_balance = balances
|
||||||
|
.entry(*validator_index)
|
||||||
|
.or_insert_with(|| state.balances()[*validator_index]);
|
||||||
|
|
||||||
|
if participant_bit {
|
||||||
|
participant_balance.safe_add_assign(participant_reward_value)?;
|
||||||
|
|
||||||
|
balances
|
||||||
|
.entry(proposer_index)
|
||||||
|
.or_insert_with(|| state.balances()[proposer_index])
|
||||||
|
.safe_add_assign(proposer_reward_per_bit)?;
|
||||||
|
|
||||||
|
total_proposer_rewards.safe_add_assign(proposer_reward_per_bit)?;
|
||||||
|
} else {
|
||||||
|
*participant_balance = participant_balance.saturating_sub(participant_reward_value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(balances
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(i, new_balance)| {
|
||||||
|
let reward = if *i != proposer_index {
|
||||||
|
*new_balance as i64 - state.balances()[*i] as i64
|
||||||
|
} else if sync_committee_indices.contains(i) {
|
||||||
|
*new_balance as i64
|
||||||
|
- state.balances()[*i] as i64
|
||||||
|
- total_proposer_rewards as i64
|
||||||
|
} else {
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
Some(SyncCommitteeReward {
|
||||||
|
validator_index: *i as u64,
|
||||||
|
reward,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
}
|
@ -2,6 +2,7 @@ pub use crate::persisted_beacon_chain::PersistedBeaconChain;
|
|||||||
pub use crate::{
|
pub use crate::{
|
||||||
beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY},
|
beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY},
|
||||||
migrate::MigratorConfig,
|
migrate::MigratorConfig,
|
||||||
|
sync_committee_verification::Error as SyncCommitteeError,
|
||||||
validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD,
|
validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD,
|
||||||
BeaconChainError, NotifyExecutionLayer, ProduceBlockVerification,
|
BeaconChainError, NotifyExecutionLayer, ProduceBlockVerification,
|
||||||
};
|
};
|
||||||
@ -2073,6 +2074,30 @@ where
|
|||||||
|
|
||||||
(honest_head, faulty_head)
|
(honest_head, faulty_head)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn process_sync_contributions(
|
||||||
|
&self,
|
||||||
|
sync_contributions: HarnessSyncContributions<E>,
|
||||||
|
) -> Result<(), SyncCommitteeError> {
|
||||||
|
let mut verified_contributions = Vec::with_capacity(sync_contributions.len());
|
||||||
|
|
||||||
|
for (_, contribution_and_proof) in sync_contributions {
|
||||||
|
let signed_contribution_and_proof = contribution_and_proof.unwrap();
|
||||||
|
|
||||||
|
let verified_contribution = self
|
||||||
|
.chain
|
||||||
|
.verify_sync_contribution_for_gossip(signed_contribution_and_proof)?;
|
||||||
|
|
||||||
|
verified_contributions.push(verified_contribution);
|
||||||
|
}
|
||||||
|
|
||||||
|
for verified_contribution in verified_contributions {
|
||||||
|
self.chain
|
||||||
|
.add_contribution_to_block_inclusion_pool(verified_contribution)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Junk `Debug` impl to satistfy certain trait bounds during testing.
|
// Junk `Debug` impl to satistfy certain trait bounds during testing.
|
||||||
|
@ -5,6 +5,7 @@ mod capella;
|
|||||||
mod merge;
|
mod merge;
|
||||||
mod op_verification;
|
mod op_verification;
|
||||||
mod payload_invalidation;
|
mod payload_invalidation;
|
||||||
|
mod rewards;
|
||||||
mod store_tests;
|
mod store_tests;
|
||||||
mod sync_committee_verification;
|
mod sync_committee_verification;
|
||||||
mod tests;
|
mod tests;
|
||||||
|
121
beacon_node/beacon_chain/tests/rewards.rs
Normal file
121
beacon_node/beacon_chain/tests/rewards.rs
Normal file
@ -0,0 +1,121 @@
|
|||||||
|
#![cfg(test)]
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use beacon_chain::test_utils::{
|
||||||
|
generate_deterministic_keypairs, BeaconChainHarness, EphemeralHarnessType,
|
||||||
|
};
|
||||||
|
use beacon_chain::{
|
||||||
|
test_utils::{AttestationStrategy, BlockStrategy, RelativeSyncCommittee},
|
||||||
|
types::{Epoch, EthSpec, Keypair, MinimalEthSpec},
|
||||||
|
};
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
|
||||||
|
pub const VALIDATOR_COUNT: usize = 64;
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static ref KEYPAIRS: Vec<Keypair> = generate_deterministic_keypairs(VALIDATOR_COUNT);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_harness<E: EthSpec>() -> BeaconChainHarness<EphemeralHarnessType<E>> {
|
||||||
|
let mut spec = E::default_spec();
|
||||||
|
|
||||||
|
spec.altair_fork_epoch = Some(Epoch::new(0)); // We use altair for all tests
|
||||||
|
|
||||||
|
let harness = BeaconChainHarness::builder(E::default())
|
||||||
|
.spec(spec)
|
||||||
|
.keypairs(KEYPAIRS.to_vec())
|
||||||
|
.fresh_ephemeral_store()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
harness.advance_slot();
|
||||||
|
|
||||||
|
harness
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_sync_committee_rewards() {
|
||||||
|
let num_block_produced = MinimalEthSpec::slots_per_epoch();
|
||||||
|
let harness = get_harness::<MinimalEthSpec>();
|
||||||
|
|
||||||
|
let latest_block_root = harness
|
||||||
|
.extend_chain(
|
||||||
|
num_block_produced as usize,
|
||||||
|
BlockStrategy::OnCanonicalHead,
|
||||||
|
AttestationStrategy::AllValidators,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Create and add sync committee message to op_pool
|
||||||
|
let sync_contributions = harness.make_sync_contributions(
|
||||||
|
&harness.get_current_state(),
|
||||||
|
latest_block_root,
|
||||||
|
harness.get_current_slot(),
|
||||||
|
RelativeSyncCommittee::Current,
|
||||||
|
);
|
||||||
|
|
||||||
|
harness
|
||||||
|
.process_sync_contributions(sync_contributions)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Add block
|
||||||
|
let chain = &harness.chain;
|
||||||
|
let (head_state, head_state_root) = harness.get_current_state_and_root();
|
||||||
|
let target_slot = harness.get_current_slot() + 1;
|
||||||
|
|
||||||
|
let (block_root, mut state) = harness
|
||||||
|
.add_attested_block_at_slot(target_slot, head_state, head_state_root, &[])
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let block = harness.get_block(block_root).unwrap();
|
||||||
|
let parent_block = chain
|
||||||
|
.get_blinded_block(&block.parent_root())
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
let parent_state = chain
|
||||||
|
.get_state(&parent_block.state_root(), Some(parent_block.slot()))
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let reward_payload = chain
|
||||||
|
.compute_sync_committee_rewards(block.message(), &mut state)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let rewards = reward_payload
|
||||||
|
.iter()
|
||||||
|
.map(|reward| (reward.validator_index, reward.reward))
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
|
let proposer_index = state
|
||||||
|
.get_beacon_proposer_index(target_slot, &MinimalEthSpec::default_spec())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut mismatches = vec![];
|
||||||
|
|
||||||
|
for validator in state.validators() {
|
||||||
|
let validator_index = state
|
||||||
|
.clone()
|
||||||
|
.get_validator_index(&validator.pubkey)
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
let pre_state_balance = parent_state.balances()[validator_index];
|
||||||
|
let post_state_balance = state.balances()[validator_index];
|
||||||
|
let sync_committee_reward = rewards.get(&(validator_index as u64)).unwrap_or(&0);
|
||||||
|
|
||||||
|
if validator_index == proposer_index {
|
||||||
|
continue; // Ignore proposer
|
||||||
|
}
|
||||||
|
|
||||||
|
if pre_state_balance as i64 + *sync_committee_reward != post_state_balance as i64 {
|
||||||
|
mismatches.push(validator_index.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
mismatches.len(),
|
||||||
|
0,
|
||||||
|
"Expect 0 mismatches, but these validators have mismatches on balance: {} ",
|
||||||
|
mismatches.join(",")
|
||||||
|
);
|
||||||
|
}
|
@ -16,6 +16,7 @@ mod metrics;
|
|||||||
mod proposer_duties;
|
mod proposer_duties;
|
||||||
mod publish_blocks;
|
mod publish_blocks;
|
||||||
mod state_id;
|
mod state_id;
|
||||||
|
mod sync_committee_rewards;
|
||||||
mod sync_committees;
|
mod sync_committees;
|
||||||
mod ui;
|
mod ui;
|
||||||
mod validator_inclusion;
|
mod validator_inclusion;
|
||||||
@ -1800,6 +1801,41 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* beacon/rewards
|
||||||
|
*/
|
||||||
|
|
||||||
|
let beacon_rewards_path = eth_v1
|
||||||
|
.and(warp::path("beacon"))
|
||||||
|
.and(warp::path("rewards"))
|
||||||
|
.and(chain_filter.clone());
|
||||||
|
|
||||||
|
// POST beacon/rewards/sync_committee/{block_id}
|
||||||
|
let post_beacon_rewards_sync_committee = beacon_rewards_path
|
||||||
|
.clone()
|
||||||
|
.and(warp::path("sync_committee"))
|
||||||
|
.and(block_id_or_err)
|
||||||
|
.and(warp::path::end())
|
||||||
|
.and(warp::body::json())
|
||||||
|
.and(log_filter.clone())
|
||||||
|
.and_then(
|
||||||
|
|chain: Arc<BeaconChain<T>>,
|
||||||
|
block_id: BlockId,
|
||||||
|
validators: Vec<ValidatorId>,
|
||||||
|
log: Logger| {
|
||||||
|
blocking_json_task(move || {
|
||||||
|
let (rewards, execution_optimistic) =
|
||||||
|
sync_committee_rewards::compute_sync_committee_rewards(
|
||||||
|
chain, block_id, validators, log,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(rewards)
|
||||||
|
.map(api_types::GenericResponse::from)
|
||||||
|
.map(|resp| resp.add_execution_optimistic(execution_optimistic))
|
||||||
|
})
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* config
|
* config
|
||||||
*/
|
*/
|
||||||
@ -3499,6 +3535,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.or(post_beacon_pool_voluntary_exits.boxed())
|
.or(post_beacon_pool_voluntary_exits.boxed())
|
||||||
.or(post_beacon_pool_sync_committees.boxed())
|
.or(post_beacon_pool_sync_committees.boxed())
|
||||||
.or(post_beacon_pool_bls_to_execution_changes.boxed())
|
.or(post_beacon_pool_bls_to_execution_changes.boxed())
|
||||||
|
.or(post_beacon_rewards_sync_committee.boxed())
|
||||||
.or(post_validator_duties_attester.boxed())
|
.or(post_validator_duties_attester.boxed())
|
||||||
.or(post_validator_duties_sync.boxed())
|
.or(post_validator_duties_sync.boxed())
|
||||||
.or(post_validator_aggregate_and_proofs.boxed())
|
.or(post_validator_aggregate_and_proofs.boxed())
|
||||||
|
77
beacon_node/http_api/src/sync_committee_rewards.rs
Normal file
77
beacon_node/http_api/src/sync_committee_rewards.rs
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
use crate::{BlockId, ExecutionOptimistic};
|
||||||
|
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||||
|
use eth2::lighthouse::SyncCommitteeReward;
|
||||||
|
use eth2::types::ValidatorId;
|
||||||
|
use slog::{debug, Logger};
|
||||||
|
use state_processing::BlockReplayer;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use types::{BeaconState, SignedBlindedBeaconBlock};
|
||||||
|
use warp_utils::reject::{beacon_chain_error, custom_not_found};
|
||||||
|
|
||||||
|
pub fn compute_sync_committee_rewards<T: BeaconChainTypes>(
|
||||||
|
chain: Arc<BeaconChain<T>>,
|
||||||
|
block_id: BlockId,
|
||||||
|
validators: Vec<ValidatorId>,
|
||||||
|
log: Logger,
|
||||||
|
) -> Result<(Option<Vec<SyncCommitteeReward>>, ExecutionOptimistic), warp::Rejection> {
|
||||||
|
let (block, execution_optimistic) = block_id.blinded_block(&chain)?;
|
||||||
|
|
||||||
|
let mut state = get_state_before_applying_block(chain.clone(), &block)?;
|
||||||
|
|
||||||
|
let reward_payload = chain
|
||||||
|
.compute_sync_committee_rewards(block.message(), &mut state)
|
||||||
|
.map_err(beacon_chain_error)?;
|
||||||
|
|
||||||
|
let data = if reward_payload.is_empty() {
|
||||||
|
debug!(log, "compute_sync_committee_rewards returned empty");
|
||||||
|
None
|
||||||
|
} else if validators.is_empty() {
|
||||||
|
Some(reward_payload)
|
||||||
|
} else {
|
||||||
|
Some(
|
||||||
|
reward_payload
|
||||||
|
.into_iter()
|
||||||
|
.filter(|reward| {
|
||||||
|
validators.iter().any(|validator| match validator {
|
||||||
|
ValidatorId::Index(i) => reward.validator_index == *i,
|
||||||
|
ValidatorId::PublicKey(pubkey) => match state.get_validator_index(pubkey) {
|
||||||
|
Ok(Some(i)) => reward.validator_index == i as u64,
|
||||||
|
_ => false,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect::<Vec<SyncCommitteeReward>>(),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((data, execution_optimistic))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_state_before_applying_block<T: BeaconChainTypes>(
|
||||||
|
chain: Arc<BeaconChain<T>>,
|
||||||
|
block: &SignedBlindedBeaconBlock<T::EthSpec>,
|
||||||
|
) -> Result<BeaconState<T::EthSpec>, warp::reject::Rejection> {
|
||||||
|
let parent_block: SignedBlindedBeaconBlock<T::EthSpec> = chain
|
||||||
|
.get_blinded_block(&block.parent_root())
|
||||||
|
.and_then(|maybe_block| {
|
||||||
|
maybe_block.ok_or_else(|| BeaconChainError::MissingBeaconBlock(block.parent_root()))
|
||||||
|
})
|
||||||
|
.map_err(|e| custom_not_found(format!("Parent block is not available! {:?}", e)))?;
|
||||||
|
|
||||||
|
let parent_state = chain
|
||||||
|
.get_state(&parent_block.state_root(), Some(parent_block.slot()))
|
||||||
|
.and_then(|maybe_state| {
|
||||||
|
maybe_state
|
||||||
|
.ok_or_else(|| BeaconChainError::MissingBeaconState(parent_block.state_root()))
|
||||||
|
})
|
||||||
|
.map_err(|e| custom_not_found(format!("Parent state is not available! {:?}", e)))?;
|
||||||
|
|
||||||
|
let replayer = BlockReplayer::new(parent_state, &chain.spec)
|
||||||
|
.no_signature_verification()
|
||||||
|
.state_root_iter([Ok((parent_block.state_root(), parent_block.slot()))].into_iter())
|
||||||
|
.minimal_block_root_verification()
|
||||||
|
.apply_blocks(vec![], Some(block.slot()))
|
||||||
|
.map_err(beacon_chain_error)?;
|
||||||
|
|
||||||
|
Ok(replayer.into_state())
|
||||||
|
}
|
@ -69,7 +69,8 @@ use types::{
|
|||||||
SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId,
|
SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId,
|
||||||
};
|
};
|
||||||
use work_reprocessing_queue::{
|
use work_reprocessing_queue::{
|
||||||
spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork,
|
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
|
||||||
|
QueuedUnaggregate, ReadyWork,
|
||||||
};
|
};
|
||||||
|
|
||||||
use worker::{Toolbox, Worker};
|
use worker::{Toolbox, Worker};
|
||||||
@ -143,6 +144,10 @@ const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024;
|
|||||||
/// before we start dropping them.
|
/// before we start dropping them.
|
||||||
const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024;
|
const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024;
|
||||||
|
|
||||||
|
/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored
|
||||||
|
/// for reprocessing before we start dropping them.
|
||||||
|
const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128;
|
||||||
|
|
||||||
/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping
|
/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping
|
||||||
/// them.
|
/// them.
|
||||||
const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048;
|
const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048;
|
||||||
@ -229,6 +234,7 @@ pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request";
|
|||||||
pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap";
|
pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap";
|
||||||
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
|
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
|
||||||
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
|
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
|
||||||
|
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
|
||||||
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
|
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
|
||||||
|
|
||||||
/// A simple first-in-first-out queue with a maximum length.
|
/// A simple first-in-first-out queue with a maximum length.
|
||||||
@ -762,6 +768,21 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
|
|||||||
seen_timestamp,
|
seen_timestamp,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
ReadyWork::LightClientUpdate(QueuedLightClientUpdate {
|
||||||
|
peer_id,
|
||||||
|
message_id,
|
||||||
|
light_client_optimistic_update,
|
||||||
|
seen_timestamp,
|
||||||
|
..
|
||||||
|
}) => Self {
|
||||||
|
drop_during_sync: true,
|
||||||
|
work: Work::UnknownLightClientOptimisticUpdate {
|
||||||
|
message_id,
|
||||||
|
peer_id,
|
||||||
|
light_client_optimistic_update,
|
||||||
|
seen_timestamp,
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -801,6 +822,12 @@ pub enum Work<T: BeaconChainTypes> {
|
|||||||
aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
|
aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
|
||||||
seen_timestamp: Duration,
|
seen_timestamp: Duration,
|
||||||
},
|
},
|
||||||
|
UnknownLightClientOptimisticUpdate {
|
||||||
|
message_id: MessageId,
|
||||||
|
peer_id: PeerId,
|
||||||
|
light_client_optimistic_update: Box<LightClientOptimisticUpdate<T::EthSpec>>,
|
||||||
|
seen_timestamp: Duration,
|
||||||
|
},
|
||||||
GossipAggregateBatch {
|
GossipAggregateBatch {
|
||||||
packages: Vec<GossipAggregatePackage<T::EthSpec>>,
|
packages: Vec<GossipAggregatePackage<T::EthSpec>>,
|
||||||
},
|
},
|
||||||
@ -933,6 +960,7 @@ impl<T: BeaconChainTypes> Work<T> {
|
|||||||
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
|
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
|
||||||
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
|
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
|
||||||
Work::GossipBlsToExecutionChange { .. } => GOSSIP_BLS_TO_EXECUTION_CHANGE,
|
Work::GossipBlsToExecutionChange { .. } => GOSSIP_BLS_TO_EXECUTION_CHANGE,
|
||||||
|
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1067,6 +1095,8 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
// Using a FIFO queue for light client updates to maintain sequence order.
|
// Using a FIFO queue for light client updates to maintain sequence order.
|
||||||
let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN);
|
let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN);
|
||||||
let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN);
|
let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN);
|
||||||
|
let mut unknown_light_client_update_queue =
|
||||||
|
FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN);
|
||||||
|
|
||||||
// Using a FIFO queue since blocks need to be imported sequentially.
|
// Using a FIFO queue since blocks need to be imported sequentially.
|
||||||
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
|
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
|
||||||
@ -1455,6 +1485,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
Work::GossipBlsToExecutionChange { .. } => {
|
Work::GossipBlsToExecutionChange { .. } => {
|
||||||
gossip_bls_to_execution_change_queue.push(work, work_id, &self.log)
|
gossip_bls_to_execution_change_queue.push(work, work_id, &self.log)
|
||||||
}
|
}
|
||||||
|
Work::UnknownLightClientOptimisticUpdate { .. } => {
|
||||||
|
unknown_light_client_update_queue.push(work, work_id, &self.log)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1798,6 +1831,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
message_id,
|
message_id,
|
||||||
peer_id,
|
peer_id,
|
||||||
*light_client_optimistic_update,
|
*light_client_optimistic_update,
|
||||||
|
Some(work_reprocessing_tx),
|
||||||
seen_timestamp,
|
seen_timestamp,
|
||||||
)
|
)
|
||||||
}),
|
}),
|
||||||
@ -1923,6 +1957,20 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
seen_timestamp,
|
seen_timestamp,
|
||||||
)
|
)
|
||||||
}),
|
}),
|
||||||
|
Work::UnknownLightClientOptimisticUpdate {
|
||||||
|
message_id,
|
||||||
|
peer_id,
|
||||||
|
light_client_optimistic_update,
|
||||||
|
seen_timestamp,
|
||||||
|
} => task_spawner.spawn_blocking(move || {
|
||||||
|
worker.process_gossip_optimistic_update(
|
||||||
|
message_id,
|
||||||
|
peer_id,
|
||||||
|
*light_client_optimistic_update,
|
||||||
|
None,
|
||||||
|
seen_timestamp,
|
||||||
|
)
|
||||||
|
}),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,7 @@ use futures::task::Poll;
|
|||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use lighthouse_network::{MessageId, PeerId};
|
use lighthouse_network::{MessageId, PeerId};
|
||||||
use logging::TimeLatch;
|
use logging::TimeLatch;
|
||||||
use slog::{crit, debug, error, warn, Logger};
|
use slog::{crit, debug, error, trace, warn, Logger};
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
@ -30,12 +30,16 @@ use task_executor::TaskExecutor;
|
|||||||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||||
use tokio::time::error::Error as TimeError;
|
use tokio::time::error::Error as TimeError;
|
||||||
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
|
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
|
||||||
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId};
|
use types::{
|
||||||
|
Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof,
|
||||||
|
SignedBeaconBlock, SubnetId,
|
||||||
|
};
|
||||||
|
|
||||||
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
|
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
|
||||||
const GOSSIP_BLOCKS: &str = "gossip_blocks";
|
const GOSSIP_BLOCKS: &str = "gossip_blocks";
|
||||||
const RPC_BLOCKS: &str = "rpc_blocks";
|
const RPC_BLOCKS: &str = "rpc_blocks";
|
||||||
const ATTESTATIONS: &str = "attestations";
|
const ATTESTATIONS: &str = "attestations";
|
||||||
|
const LIGHT_CLIENT_UPDATES: &str = "lc_updates";
|
||||||
|
|
||||||
/// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts.
|
/// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts.
|
||||||
/// This is to account for any slight drift in the system clock.
|
/// This is to account for any slight drift in the system clock.
|
||||||
@ -44,6 +48,9 @@ const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5);
|
|||||||
/// For how long to queue aggregated and unaggregated attestations for re-processing.
|
/// For how long to queue aggregated and unaggregated attestations for re-processing.
|
||||||
pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
|
pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
|
||||||
|
|
||||||
|
/// For how long to queue light client updates for re-processing.
|
||||||
|
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);
|
||||||
|
|
||||||
/// For how long to queue rpc blocks before sending them back for reprocessing.
|
/// For how long to queue rpc blocks before sending them back for reprocessing.
|
||||||
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3);
|
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3);
|
||||||
|
|
||||||
@ -55,6 +62,9 @@ const MAXIMUM_QUEUED_BLOCKS: usize = 16;
|
|||||||
/// How many attestations we keep before new ones get dropped.
|
/// How many attestations we keep before new ones get dropped.
|
||||||
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
|
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
|
||||||
|
|
||||||
|
/// How many light client updates we keep before new ones get dropped.
|
||||||
|
const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128;
|
||||||
|
|
||||||
/// Messages that the scheduler can receive.
|
/// Messages that the scheduler can receive.
|
||||||
pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
|
pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
|
||||||
/// A block that has been received early and we should queue for later processing.
|
/// A block that has been received early and we should queue for later processing.
|
||||||
@ -62,13 +72,18 @@ pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
|
|||||||
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
|
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
|
||||||
/// hash until the gossip block is imported.
|
/// hash until the gossip block is imported.
|
||||||
RpcBlock(QueuedRpcBlock<T::EthSpec>),
|
RpcBlock(QueuedRpcBlock<T::EthSpec>),
|
||||||
/// A block that was successfully processed. We use this to handle attestations for unknown
|
/// A block that was successfully processed. We use this to handle attestations and light client updates
|
||||||
/// blocks.
|
/// for unknown blocks.
|
||||||
BlockImported(Hash256),
|
BlockImported {
|
||||||
|
block_root: Hash256,
|
||||||
|
parent_root: Hash256,
|
||||||
|
},
|
||||||
/// An unaggregated attestation that references an unknown block.
|
/// An unaggregated attestation that references an unknown block.
|
||||||
UnknownBlockUnaggregate(QueuedUnaggregate<T::EthSpec>),
|
UnknownBlockUnaggregate(QueuedUnaggregate<T::EthSpec>),
|
||||||
/// An aggregated attestation that references an unknown block.
|
/// An aggregated attestation that references an unknown block.
|
||||||
UnknownBlockAggregate(QueuedAggregate<T::EthSpec>),
|
UnknownBlockAggregate(QueuedAggregate<T::EthSpec>),
|
||||||
|
/// A light client optimistic update that references a parent root that has not been seen as a parent.
|
||||||
|
UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate<T::EthSpec>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Events sent by the scheduler once they are ready for re-processing.
|
/// Events sent by the scheduler once they are ready for re-processing.
|
||||||
@ -77,6 +92,7 @@ pub enum ReadyWork<T: BeaconChainTypes> {
|
|||||||
RpcBlock(QueuedRpcBlock<T::EthSpec>),
|
RpcBlock(QueuedRpcBlock<T::EthSpec>),
|
||||||
Unaggregate(QueuedUnaggregate<T::EthSpec>),
|
Unaggregate(QueuedUnaggregate<T::EthSpec>),
|
||||||
Aggregate(QueuedAggregate<T::EthSpec>),
|
Aggregate(QueuedAggregate<T::EthSpec>),
|
||||||
|
LightClientUpdate(QueuedLightClientUpdate<T::EthSpec>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An Attestation for which the corresponding block was not seen while processing, queued for
|
/// An Attestation for which the corresponding block was not seen while processing, queued for
|
||||||
@ -99,6 +115,16 @@ pub struct QueuedAggregate<T: EthSpec> {
|
|||||||
pub seen_timestamp: Duration,
|
pub seen_timestamp: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A light client update for which the corresponding parent block was not seen while processing,
|
||||||
|
/// queued for later.
|
||||||
|
pub struct QueuedLightClientUpdate<T: EthSpec> {
|
||||||
|
pub peer_id: PeerId,
|
||||||
|
pub message_id: MessageId,
|
||||||
|
pub light_client_optimistic_update: Box<LightClientOptimisticUpdate<T>>,
|
||||||
|
pub parent_root: Hash256,
|
||||||
|
pub seen_timestamp: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
/// A block that arrived early and has been queued for later import.
|
/// A block that arrived early and has been queued for later import.
|
||||||
pub struct QueuedGossipBlock<T: BeaconChainTypes> {
|
pub struct QueuedGossipBlock<T: BeaconChainTypes> {
|
||||||
pub peer_id: PeerId,
|
pub peer_id: PeerId,
|
||||||
@ -127,6 +153,8 @@ enum InboundEvent<T: BeaconChainTypes> {
|
|||||||
ReadyRpcBlock(QueuedRpcBlock<T::EthSpec>),
|
ReadyRpcBlock(QueuedRpcBlock<T::EthSpec>),
|
||||||
/// An aggregated or unaggregated attestation is ready for re-processing.
|
/// An aggregated or unaggregated attestation is ready for re-processing.
|
||||||
ReadyAttestation(QueuedAttestationId),
|
ReadyAttestation(QueuedAttestationId),
|
||||||
|
/// A light client update that is ready for re-processing.
|
||||||
|
ReadyLightClientUpdate(QueuedLightClientUpdateId),
|
||||||
/// A `DelayQueue` returned an error.
|
/// A `DelayQueue` returned an error.
|
||||||
DelayQueueError(TimeError, &'static str),
|
DelayQueueError(TimeError, &'static str),
|
||||||
/// A message sent to the `ReprocessQueue`
|
/// A message sent to the `ReprocessQueue`
|
||||||
@ -147,6 +175,8 @@ struct ReprocessQueue<T: BeaconChainTypes> {
|
|||||||
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock<T::EthSpec>>,
|
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock<T::EthSpec>>,
|
||||||
/// Queue to manage scheduled attestations.
|
/// Queue to manage scheduled attestations.
|
||||||
attestations_delay_queue: DelayQueue<QueuedAttestationId>,
|
attestations_delay_queue: DelayQueue<QueuedAttestationId>,
|
||||||
|
/// Queue to manage scheduled light client updates.
|
||||||
|
lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>,
|
||||||
|
|
||||||
/* Queued items */
|
/* Queued items */
|
||||||
/// Queued blocks.
|
/// Queued blocks.
|
||||||
@ -157,15 +187,23 @@ struct ReprocessQueue<T: BeaconChainTypes> {
|
|||||||
queued_unaggregates: FnvHashMap<usize, (QueuedUnaggregate<T::EthSpec>, DelayKey)>,
|
queued_unaggregates: FnvHashMap<usize, (QueuedUnaggregate<T::EthSpec>, DelayKey)>,
|
||||||
/// Attestations (aggregated and unaggregated) per root.
|
/// Attestations (aggregated and unaggregated) per root.
|
||||||
awaiting_attestations_per_root: HashMap<Hash256, Vec<QueuedAttestationId>>,
|
awaiting_attestations_per_root: HashMap<Hash256, Vec<QueuedAttestationId>>,
|
||||||
|
/// Queued Light Client Updates.
|
||||||
|
queued_lc_updates: FnvHashMap<usize, (QueuedLightClientUpdate<T::EthSpec>, DelayKey)>,
|
||||||
|
/// Light Client Updates per parent_root.
|
||||||
|
awaiting_lc_updates_per_parent_root: HashMap<Hash256, Vec<QueuedLightClientUpdateId>>,
|
||||||
|
|
||||||
/* Aux */
|
/* Aux */
|
||||||
/// Next attestation id, used for both aggregated and unaggregated attestations
|
/// Next attestation id, used for both aggregated and unaggregated attestations
|
||||||
next_attestation: usize,
|
next_attestation: usize,
|
||||||
|
next_lc_update: usize,
|
||||||
early_block_debounce: TimeLatch,
|
early_block_debounce: TimeLatch,
|
||||||
rpc_block_debounce: TimeLatch,
|
rpc_block_debounce: TimeLatch,
|
||||||
attestation_delay_debounce: TimeLatch,
|
attestation_delay_debounce: TimeLatch,
|
||||||
|
lc_update_delay_debounce: TimeLatch,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub type QueuedLightClientUpdateId = usize;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
enum QueuedAttestationId {
|
enum QueuedAttestationId {
|
||||||
Aggregate(usize),
|
Aggregate(usize),
|
||||||
@ -235,6 +273,20 @@ impl<T: BeaconChainTypes> Stream for ReprocessQueue<T> {
|
|||||||
Poll::Ready(None) | Poll::Pending => (),
|
Poll::Ready(None) | Poll::Pending => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
match self.lc_updates_delay_queue.poll_expired(cx) {
|
||||||
|
Poll::Ready(Some(Ok(lc_id))) => {
|
||||||
|
return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate(
|
||||||
|
lc_id.into_inner(),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Poll::Ready(Some(Err(e))) => {
|
||||||
|
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue")));
|
||||||
|
}
|
||||||
|
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
|
||||||
|
// will continue to get this result until something else is added into the queue.
|
||||||
|
Poll::Ready(None) | Poll::Pending => (),
|
||||||
|
}
|
||||||
|
|
||||||
// Last empty the messages channel.
|
// Last empty the messages channel.
|
||||||
match self.work_reprocessing_rx.poll_recv(cx) {
|
match self.work_reprocessing_rx.poll_recv(cx) {
|
||||||
Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))),
|
Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))),
|
||||||
@ -264,14 +316,19 @@ pub fn spawn_reprocess_scheduler<T: BeaconChainTypes>(
|
|||||||
gossip_block_delay_queue: DelayQueue::new(),
|
gossip_block_delay_queue: DelayQueue::new(),
|
||||||
rpc_block_delay_queue: DelayQueue::new(),
|
rpc_block_delay_queue: DelayQueue::new(),
|
||||||
attestations_delay_queue: DelayQueue::new(),
|
attestations_delay_queue: DelayQueue::new(),
|
||||||
|
lc_updates_delay_queue: DelayQueue::new(),
|
||||||
queued_gossip_block_roots: HashSet::new(),
|
queued_gossip_block_roots: HashSet::new(),
|
||||||
|
queued_lc_updates: FnvHashMap::default(),
|
||||||
queued_aggregates: FnvHashMap::default(),
|
queued_aggregates: FnvHashMap::default(),
|
||||||
queued_unaggregates: FnvHashMap::default(),
|
queued_unaggregates: FnvHashMap::default(),
|
||||||
awaiting_attestations_per_root: HashMap::new(),
|
awaiting_attestations_per_root: HashMap::new(),
|
||||||
|
awaiting_lc_updates_per_parent_root: HashMap::new(),
|
||||||
next_attestation: 0,
|
next_attestation: 0,
|
||||||
|
next_lc_update: 0,
|
||||||
early_block_debounce: TimeLatch::default(),
|
early_block_debounce: TimeLatch::default(),
|
||||||
rpc_block_debounce: TimeLatch::default(),
|
rpc_block_debounce: TimeLatch::default(),
|
||||||
attestation_delay_debounce: TimeLatch::default(),
|
attestation_delay_debounce: TimeLatch::default(),
|
||||||
|
lc_update_delay_debounce: TimeLatch::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
executor.spawn(
|
executor.spawn(
|
||||||
@ -473,9 +530,49 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
|||||||
|
|
||||||
self.next_attestation += 1;
|
self.next_attestation += 1;
|
||||||
}
|
}
|
||||||
InboundEvent::Msg(BlockImported(root)) => {
|
InboundEvent::Msg(UnknownLightClientOptimisticUpdate(
|
||||||
|
queued_light_client_optimistic_update,
|
||||||
|
)) => {
|
||||||
|
if self.lc_updates_delay_queue.len() >= MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES {
|
||||||
|
if self.lc_update_delay_debounce.elapsed() {
|
||||||
|
error!(
|
||||||
|
log,
|
||||||
|
"Light client updates delay queue is full";
|
||||||
|
"queue_size" => MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES,
|
||||||
|
"msg" => "check system clock"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// Drop the light client update.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let lc_id: QueuedLightClientUpdateId = self.next_lc_update;
|
||||||
|
|
||||||
|
// Register the delay.
|
||||||
|
let delay_key = self
|
||||||
|
.lc_updates_delay_queue
|
||||||
|
.insert(lc_id, QUEUED_LIGHT_CLIENT_UPDATE_DELAY);
|
||||||
|
|
||||||
|
// Register the light client update for the corresponding root.
|
||||||
|
self.awaiting_lc_updates_per_parent_root
|
||||||
|
.entry(queued_light_client_optimistic_update.parent_root)
|
||||||
|
.or_default()
|
||||||
|
.push(lc_id);
|
||||||
|
|
||||||
|
// Store the light client update and its info.
|
||||||
|
self.queued_lc_updates.insert(
|
||||||
|
self.next_lc_update,
|
||||||
|
(queued_light_client_optimistic_update, delay_key),
|
||||||
|
);
|
||||||
|
|
||||||
|
self.next_lc_update += 1;
|
||||||
|
}
|
||||||
|
InboundEvent::Msg(BlockImported {
|
||||||
|
block_root,
|
||||||
|
parent_root,
|
||||||
|
}) => {
|
||||||
// Unqueue the attestations we have for this root, if any.
|
// Unqueue the attestations we have for this root, if any.
|
||||||
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&root) {
|
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) {
|
||||||
for id in queued_ids {
|
for id in queued_ids {
|
||||||
metrics::inc_counter(
|
metrics::inc_counter(
|
||||||
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS,
|
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS,
|
||||||
@ -511,12 +608,62 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
|||||||
error!(
|
error!(
|
||||||
log,
|
log,
|
||||||
"Unknown queued attestation for block root";
|
"Unknown queued attestation for block root";
|
||||||
"block_root" => ?root,
|
"block_root" => ?block_root,
|
||||||
"att_id" => ?id,
|
"att_id" => ?id,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Unqueue the light client optimistic updates we have for this root, if any.
|
||||||
|
if let Some(queued_lc_id) = self
|
||||||
|
.awaiting_lc_updates_per_parent_root
|
||||||
|
.remove(&parent_root)
|
||||||
|
{
|
||||||
|
debug!(
|
||||||
|
log,
|
||||||
|
"Dequeuing light client optimistic updates";
|
||||||
|
"parent_root" => %parent_root,
|
||||||
|
"count" => queued_lc_id.len(),
|
||||||
|
);
|
||||||
|
|
||||||
|
for lc_id in queued_lc_id {
|
||||||
|
metrics::inc_counter(
|
||||||
|
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES,
|
||||||
|
);
|
||||||
|
if let Some((work, delay_key)) = self.queued_lc_updates.remove(&lc_id).map(
|
||||||
|
|(light_client_optimistic_update, delay_key)| {
|
||||||
|
(
|
||||||
|
ReadyWork::LightClientUpdate(light_client_optimistic_update),
|
||||||
|
delay_key,
|
||||||
|
)
|
||||||
|
},
|
||||||
|
) {
|
||||||
|
// Remove the delay
|
||||||
|
self.lc_updates_delay_queue.remove(&delay_key);
|
||||||
|
|
||||||
|
// Send the work
|
||||||
|
match self.ready_work_tx.try_send(work) {
|
||||||
|
Ok(_) => trace!(
|
||||||
|
log,
|
||||||
|
"reprocessing light client update sent";
|
||||||
|
),
|
||||||
|
Err(_) => error!(
|
||||||
|
log,
|
||||||
|
"Failed to send scheduled light client update";
|
||||||
|
),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// There is a mismatch between the light client update ids registered for this
|
||||||
|
// root and the queued light client updates. This should never happen.
|
||||||
|
error!(
|
||||||
|
log,
|
||||||
|
"Unknown queued light client update for parent root";
|
||||||
|
"parent_root" => ?parent_root,
|
||||||
|
"lc_id" => ?lc_id,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// A block that was queued for later processing is now ready to be processed.
|
// A block that was queued for later processing is now ready to be processed.
|
||||||
InboundEvent::ReadyGossipBlock(ready_block) => {
|
InboundEvent::ReadyGossipBlock(ready_block) => {
|
||||||
@ -591,6 +738,38 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
InboundEvent::ReadyLightClientUpdate(queued_id) => {
|
||||||
|
metrics::inc_counter(
|
||||||
|
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES,
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some((parent_root, work)) = self.queued_lc_updates.remove(&queued_id).map(
|
||||||
|
|(queued_lc_update, _delay_key)| {
|
||||||
|
(
|
||||||
|
queued_lc_update.parent_root,
|
||||||
|
ReadyWork::LightClientUpdate(queued_lc_update),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
) {
|
||||||
|
if self.ready_work_tx.try_send(work).is_err() {
|
||||||
|
error!(
|
||||||
|
log,
|
||||||
|
"Failed to send scheduled light client optimistic update";
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(queued_lc_updates) = self
|
||||||
|
.awaiting_lc_updates_per_parent_root
|
||||||
|
.get_mut(&parent_root)
|
||||||
|
{
|
||||||
|
if let Some(index) =
|
||||||
|
queued_lc_updates.iter().position(|&id| id == queued_id)
|
||||||
|
{
|
||||||
|
queued_lc_updates.swap_remove(index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics::set_gauge_vec(
|
metrics::set_gauge_vec(
|
||||||
@ -608,5 +787,10 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
|||||||
&[ATTESTATIONS],
|
&[ATTESTATIONS],
|
||||||
self.attestations_delay_queue.len() as i64,
|
self.attestations_delay_queue.len() as i64,
|
||||||
);
|
);
|
||||||
|
metrics::set_gauge_vec(
|
||||||
|
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
|
||||||
|
&[LIGHT_CLIENT_UPDATES],
|
||||||
|
self.lc_updates_delay_queue.len() as i64,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,7 +28,8 @@ use types::{
|
|||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
super::work_reprocessing_queue::{
|
super::work_reprocessing_queue::{
|
||||||
QueuedAggregate, QueuedGossipBlock, QueuedUnaggregate, ReprocessQueueMessage,
|
QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate,
|
||||||
|
ReprocessQueueMessage,
|
||||||
},
|
},
|
||||||
Worker,
|
Worker,
|
||||||
};
|
};
|
||||||
@ -954,7 +955,10 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
|
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
|
||||||
|
|
||||||
if reprocess_tx
|
if reprocess_tx
|
||||||
.try_send(ReprocessQueueMessage::BlockImported(block_root))
|
.try_send(ReprocessQueueMessage::BlockImported {
|
||||||
|
block_root,
|
||||||
|
parent_root: block.message().parent_root(),
|
||||||
|
})
|
||||||
.is_err()
|
.is_err()
|
||||||
{
|
{
|
||||||
error!(
|
error!(
|
||||||
@ -1403,7 +1407,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
LightClientFinalityUpdateError::InvalidLightClientFinalityUpdate => {
|
LightClientFinalityUpdateError::InvalidLightClientFinalityUpdate => {
|
||||||
debug!(
|
debug!(
|
||||||
self.log,
|
self.log,
|
||||||
"LC invalid finality update";
|
"Light client invalid finality update";
|
||||||
"peer" => %peer_id,
|
"peer" => %peer_id,
|
||||||
"error" => ?e,
|
"error" => ?e,
|
||||||
);
|
);
|
||||||
@ -1417,7 +1421,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
LightClientFinalityUpdateError::TooEarly => {
|
LightClientFinalityUpdateError::TooEarly => {
|
||||||
debug!(
|
debug!(
|
||||||
self.log,
|
self.log,
|
||||||
"LC finality update too early";
|
"Light client finality update too early";
|
||||||
"peer" => %peer_id,
|
"peer" => %peer_id,
|
||||||
"error" => ?e,
|
"error" => ?e,
|
||||||
);
|
);
|
||||||
@ -1430,7 +1434,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
}
|
}
|
||||||
LightClientFinalityUpdateError::FinalityUpdateAlreadySeen => debug!(
|
LightClientFinalityUpdateError::FinalityUpdateAlreadySeen => debug!(
|
||||||
self.log,
|
self.log,
|
||||||
"LC finality update already seen";
|
"Light client finality update already seen";
|
||||||
"peer" => %peer_id,
|
"peer" => %peer_id,
|
||||||
"error" => ?e,
|
"error" => ?e,
|
||||||
),
|
),
|
||||||
@ -1439,7 +1443,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
| LightClientFinalityUpdateError::SigSlotStartIsNone
|
| LightClientFinalityUpdateError::SigSlotStartIsNone
|
||||||
| LightClientFinalityUpdateError::FailedConstructingUpdate => debug!(
|
| LightClientFinalityUpdateError::FailedConstructingUpdate => debug!(
|
||||||
self.log,
|
self.log,
|
||||||
"LC error constructing finality update";
|
"Light client error constructing finality update";
|
||||||
"peer" => %peer_id,
|
"peer" => %peer_id,
|
||||||
"error" => ?e,
|
"error" => ?e,
|
||||||
),
|
),
|
||||||
@ -1454,22 +1458,77 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
message_id: MessageId,
|
message_id: MessageId,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
light_client_optimistic_update: LightClientOptimisticUpdate<T::EthSpec>,
|
light_client_optimistic_update: LightClientOptimisticUpdate<T::EthSpec>,
|
||||||
|
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
|
||||||
seen_timestamp: Duration,
|
seen_timestamp: Duration,
|
||||||
) {
|
) {
|
||||||
match self
|
match self.chain.verify_optimistic_update_for_gossip(
|
||||||
.chain
|
light_client_optimistic_update.clone(),
|
||||||
.verify_optimistic_update_for_gossip(light_client_optimistic_update, seen_timestamp)
|
seen_timestamp,
|
||||||
{
|
) {
|
||||||
Ok(_verified_light_client_optimistic_update) => {
|
Ok(verified_light_client_optimistic_update) => {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Light client successful optimistic update";
|
||||||
|
"peer" => %peer_id,
|
||||||
|
"parent_root" => %verified_light_client_optimistic_update.parent_root,
|
||||||
|
);
|
||||||
|
|
||||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
|
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
metrics::register_optimistic_update_error(&e);
|
|
||||||
match e {
|
match e {
|
||||||
LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => {
|
LightClientOptimisticUpdateError::UnknownBlockParentRoot(parent_root) => {
|
||||||
|
metrics::inc_counter(
|
||||||
|
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES,
|
||||||
|
);
|
||||||
debug!(
|
debug!(
|
||||||
self.log,
|
self.log,
|
||||||
"LC invalid optimistic update";
|
"Optimistic update for unknown block";
|
||||||
|
"peer_id" => %peer_id,
|
||||||
|
"parent_root" => ?parent_root
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some(sender) = reprocess_tx {
|
||||||
|
let msg = ReprocessQueueMessage::UnknownLightClientOptimisticUpdate(
|
||||||
|
QueuedLightClientUpdate {
|
||||||
|
peer_id,
|
||||||
|
message_id,
|
||||||
|
light_client_optimistic_update: Box::new(
|
||||||
|
light_client_optimistic_update,
|
||||||
|
),
|
||||||
|
parent_root,
|
||||||
|
seen_timestamp,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
if sender.try_send(msg).is_err() {
|
||||||
|
error!(
|
||||||
|
self.log,
|
||||||
|
"Failed to send optimistic update for re-processing";
|
||||||
|
)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Not sending light client update because it had been reprocessed";
|
||||||
|
"peer_id" => %peer_id,
|
||||||
|
"parent_root" => ?parent_root
|
||||||
|
);
|
||||||
|
|
||||||
|
self.propagate_validation_result(
|
||||||
|
message_id,
|
||||||
|
peer_id,
|
||||||
|
MessageAcceptance::Ignore,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => {
|
||||||
|
metrics::register_optimistic_update_error(&e);
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Light client invalid optimistic update";
|
||||||
"peer" => %peer_id,
|
"peer" => %peer_id,
|
||||||
"error" => ?e,
|
"error" => ?e,
|
||||||
);
|
);
|
||||||
@ -1481,9 +1540,10 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
LightClientOptimisticUpdateError::TooEarly => {
|
LightClientOptimisticUpdateError::TooEarly => {
|
||||||
|
metrics::register_optimistic_update_error(&e);
|
||||||
debug!(
|
debug!(
|
||||||
self.log,
|
self.log,
|
||||||
"LC optimistic update too early";
|
"Light client optimistic update too early";
|
||||||
"peer" => %peer_id,
|
"peer" => %peer_id,
|
||||||
"error" => ?e,
|
"error" => ?e,
|
||||||
);
|
);
|
||||||
@ -1494,21 +1554,29 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
"light_client_gossip_error",
|
"light_client_gossip_error",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => debug!(
|
LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => {
|
||||||
self.log,
|
metrics::register_optimistic_update_error(&e);
|
||||||
"LC optimistic update already seen";
|
|
||||||
"peer" => %peer_id,
|
debug!(
|
||||||
"error" => ?e,
|
self.log,
|
||||||
),
|
"Light client optimistic update already seen";
|
||||||
|
"peer" => %peer_id,
|
||||||
|
"error" => ?e,
|
||||||
|
)
|
||||||
|
}
|
||||||
LightClientOptimisticUpdateError::BeaconChainError(_)
|
LightClientOptimisticUpdateError::BeaconChainError(_)
|
||||||
| LightClientOptimisticUpdateError::LightClientUpdateError(_)
|
| LightClientOptimisticUpdateError::LightClientUpdateError(_)
|
||||||
| LightClientOptimisticUpdateError::SigSlotStartIsNone
|
| LightClientOptimisticUpdateError::SigSlotStartIsNone
|
||||||
| LightClientOptimisticUpdateError::FailedConstructingUpdate => debug!(
|
| LightClientOptimisticUpdateError::FailedConstructingUpdate => {
|
||||||
self.log,
|
metrics::register_optimistic_update_error(&e);
|
||||||
"LC error constructing optimistic update";
|
|
||||||
"peer" => %peer_id,
|
debug!(
|
||||||
"error" => ?e,
|
self.log,
|
||||||
),
|
"Light client error constructing optimistic update";
|
||||||
|
"peer" => %peer_id,
|
||||||
|
"error" => ?e,
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||||
}
|
}
|
||||||
|
@ -84,6 +84,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
let slot = block.slot();
|
let slot = block.slot();
|
||||||
|
let parent_root = block.message().parent_root();
|
||||||
let result = self
|
let result = self
|
||||||
.chain
|
.chain
|
||||||
.process_block(
|
.process_block(
|
||||||
@ -101,7 +102,10 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash);
|
info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash);
|
||||||
|
|
||||||
// Trigger processing for work referencing this block.
|
// Trigger processing for work referencing this block.
|
||||||
let reprocess_msg = ReprocessQueueMessage::BlockImported(hash);
|
let reprocess_msg = ReprocessQueueMessage::BlockImported {
|
||||||
|
block_root: hash,
|
||||||
|
parent_root,
|
||||||
|
};
|
||||||
if reprocess_tx.try_send(reprocess_msg).is_err() {
|
if reprocess_tx.try_send(reprocess_msg).is_err() {
|
||||||
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash)
|
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash)
|
||||||
};
|
};
|
||||||
|
@ -392,6 +392,21 @@ lazy_static! {
|
|||||||
"Number of queued attestations where as matching block has been imported."
|
"Number of queued attestations where as matching block has been imported."
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Light client update reprocessing queue metrics.
|
||||||
|
*/
|
||||||
|
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES: Result<IntCounter> = try_create_int_counter(
|
||||||
|
"beacon_processor_reprocessing_queue_expired_optimistic_updates",
|
||||||
|
"Number of queued light client optimistic updates which have expired before a matching block has been found."
|
||||||
|
);
|
||||||
|
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES: Result<IntCounter> = try_create_int_counter(
|
||||||
|
"beacon_processor_reprocessing_queue_matched_optimistic_updates",
|
||||||
|
"Number of queued light client optimistic updates where as matching block has been imported."
|
||||||
|
);
|
||||||
|
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES: Result<IntCounter> = try_create_int_counter(
|
||||||
|
"beacon_processor_reprocessing_queue_sent_optimistic_updates",
|
||||||
|
"Number of queued light client optimistic updates where as matching block has been imported."
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_bandwidth_metrics(bandwidth: Arc<BandwidthSinks>) {
|
pub fn update_bandwidth_metrics(bandwidth: Arc<BandwidthSinks>) {
|
||||||
|
@ -1044,6 +1044,24 @@ impl BeaconNodeHttpClient {
|
|||||||
.transpose()
|
.transpose()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// `POST beacon/rewards/sync_committee`
|
||||||
|
pub async fn post_beacon_rewards_sync_committee(
|
||||||
|
&self,
|
||||||
|
rewards: &[Option<Vec<lighthouse::SyncCommitteeReward>>],
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut path = self.eth_path(V1)?;
|
||||||
|
|
||||||
|
path.path_segments_mut()
|
||||||
|
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||||
|
.push("beacon")
|
||||||
|
.push("rewards")
|
||||||
|
.push("sync_committee");
|
||||||
|
|
||||||
|
self.post(path, &rewards).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// `POST validator/contribution_and_proofs`
|
/// `POST validator/contribution_and_proofs`
|
||||||
pub async fn post_validator_contribution_and_proofs<T: EthSpec>(
|
pub async fn post_validator_contribution_and_proofs<T: EthSpec>(
|
||||||
&self,
|
&self,
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
mod attestation_performance;
|
mod attestation_performance;
|
||||||
mod block_packing_efficiency;
|
mod block_packing_efficiency;
|
||||||
mod block_rewards;
|
mod block_rewards;
|
||||||
|
mod sync_committee_rewards;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
ok_or_error,
|
ok_or_error,
|
||||||
@ -27,6 +28,7 @@ pub use block_packing_efficiency::{
|
|||||||
};
|
};
|
||||||
pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery};
|
pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery};
|
||||||
pub use lighthouse_network::{types::SyncState, PeerInfo};
|
pub use lighthouse_network::{types::SyncState, PeerInfo};
|
||||||
|
pub use sync_committee_rewards::SyncCommitteeReward;
|
||||||
|
|
||||||
// Define "legacy" implementations of `Option<T>` which use four bytes for encoding the union
|
// Define "legacy" implementations of `Option<T>` which use four bytes for encoding the union
|
||||||
// selector.
|
// selector.
|
||||||
|
12
common/eth2/src/lighthouse/sync_committee_rewards.rs
Normal file
12
common/eth2/src/lighthouse/sync_committee_rewards.rs
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
// Details about the rewards paid to sync committee members for attesting headers
|
||||||
|
// All rewards in GWei
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct SyncCommitteeReward {
|
||||||
|
#[serde(with = "eth2_serde_utils::quoted_u64")]
|
||||||
|
pub validator_index: u64,
|
||||||
|
// sync committee reward in gwei for the validator
|
||||||
|
pub reward: i64,
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user