Initial Commit of Retrospective OTB Verification (#3372)

## Issue Addressed

* #2983 

## Proposed Changes

Basically followed the [instructions laid out here](https://github.com/sigp/lighthouse/issues/2983#issuecomment-1062494947)


Co-authored-by: Paul Hauner <paul@paulhauner.com>
Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com>
This commit is contained in:
ethDreamer 2022-07-30 00:22:38 +00:00
parent 6c2d8b2262
commit 034260bd99
12 changed files with 1013 additions and 7 deletions

View File

@ -137,6 +137,9 @@ const MAX_PER_SLOT_FORK_CHOICE_DISTANCE: u64 = 4;
pub const INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON: &str = pub const INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON: &str =
"Justified block has an invalid execution payload."; "Justified block has an invalid execution payload.";
pub const INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON: &str =
"Finalized merge transition block is invalid.";
/// Defines the behaviour when a block/block-root for a skipped slot is requested. /// Defines the behaviour when a block/block-root for a skipped slot is requested.
pub enum WhenSlotSkipped { pub enum WhenSlotSkipped {
/// If the slot is a skip slot, return `None`. /// If the slot is a skip slot, return `None`.
@ -528,6 +531,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Even more efficient variant of `forwards_iter_block_roots` that will avoid cloning the head /// Even more efficient variant of `forwards_iter_block_roots` that will avoid cloning the head
/// state if it isn't required for the requested range of blocks. /// state if it isn't required for the requested range of blocks.
/// The range [start_slot, end_slot] is inclusive (ie `start_slot <= end_slot`)
pub fn forwards_iter_block_roots_until( pub fn forwards_iter_block_roots_until(
&self, &self,
start_slot: Slot, start_slot: Slot,

View File

@ -44,7 +44,7 @@
//! ``` //! ```
use crate::execution_payload::{ use crate::execution_payload::{
is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block,
PayloadNotifier, AllowOptimisticImport, PayloadNotifier,
}; };
use crate::snapshot_cache::PreProcessingSnapshot; use crate::snapshot_cache::PreProcessingSnapshot;
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
@ -1199,7 +1199,7 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
// - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no // - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no
// calls to remote servers. // calls to remote servers.
if is_valid_merge_transition_block { if is_valid_merge_transition_block {
validate_merge_block(&chain, block.message()).await?; validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?;
}; };
// The specification declares that this should be run *inside* `per_block_processing`, // The specification declares that this should be run *inside* `per_block_processing`,

View File

@ -7,6 +7,7 @@
//! So, this module contains functions that one might expect to find in other crates, but they live //! So, this module contains functions that one might expect to find in other crates, but they live
//! here for good reason. //! here for good reason.
use crate::otb_verification_service::OptimisticTransitionBlock;
use crate::{ use crate::{
BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, BlockProductionError, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, BlockProductionError,
ExecutionPayloadError, ExecutionPayloadError,
@ -27,6 +28,12 @@ use types::*;
pub type PreparePayloadResult<Payload> = Result<Payload, BlockProductionError>; pub type PreparePayloadResult<Payload> = Result<Payload, BlockProductionError>;
pub type PreparePayloadHandle<Payload> = JoinHandle<Option<PreparePayloadResult<Payload>>>; pub type PreparePayloadHandle<Payload> = JoinHandle<Option<PreparePayloadResult<Payload>>>;
#[derive(PartialEq)]
pub enum AllowOptimisticImport {
Yes,
No,
}
/// Used to await the result of executing payload with a remote EE. /// Used to await the result of executing payload with a remote EE.
pub struct PayloadNotifier<T: BeaconChainTypes> { pub struct PayloadNotifier<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>, pub chain: Arc<BeaconChain<T>>,
@ -146,6 +153,7 @@ async fn notify_new_payload<'a, T: BeaconChainTypes>(
pub async fn validate_merge_block<'a, T: BeaconChainTypes>( pub async fn validate_merge_block<'a, T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>, chain: &Arc<BeaconChain<T>>,
block: BeaconBlockRef<'a, T::EthSpec>, block: BeaconBlockRef<'a, T::EthSpec>,
allow_optimistic_import: AllowOptimisticImport,
) -> Result<(), BlockError<T::EthSpec>> { ) -> Result<(), BlockError<T::EthSpec>> {
let spec = &chain.spec; let spec = &chain.spec;
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
@ -188,13 +196,18 @@ pub async fn validate_merge_block<'a, T: BeaconChainTypes>(
} }
.into()), .into()),
None => { None => {
if is_optimistic_candidate_block(chain, block.slot(), block.parent_root()).await? { if allow_optimistic_import == AllowOptimisticImport::Yes
&& is_optimistic_candidate_block(chain, block.slot(), block.parent_root()).await?
{
debug!( debug!(
chain.log, chain.log,
"Optimistically accepting terminal block"; "Optimistically importing merge transition block";
"block_hash" => ?execution_payload.parent_hash(), "block_hash" => ?execution_payload.parent_hash(),
"msg" => "the terminal block/parent was unavailable" "msg" => "the terminal block/parent was unavailable"
); );
// Store Optimistic Transition Block in Database for later Verification
OptimisticTransitionBlock::from_block(block)
.persist_in_store::<T, _>(&chain.store)?;
Ok(()) Ok(())
} else { } else {
Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into()) Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into())

View File

@ -28,6 +28,7 @@ mod observed_aggregates;
mod observed_attesters; mod observed_attesters;
mod observed_block_producers; mod observed_block_producers;
pub mod observed_operations; pub mod observed_operations;
pub mod otb_verification_service;
mod persisted_beacon_chain; mod persisted_beacon_chain;
mod persisted_fork_choice; mod persisted_fork_choice;
mod pre_finalization_cache; mod pre_finalization_cache;
@ -45,6 +46,7 @@ mod validator_pubkey_cache;
pub use self::beacon_chain::{ pub use self::beacon_chain::{
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult,
CountUnrealized, ForkChoiceError, ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped, CountUnrealized, ForkChoiceError, ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped,
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, MAXIMUM_GOSSIP_CLOCK_DISPARITY, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
}; };
pub use self::beacon_snapshot::BeaconSnapshot; pub use self::beacon_snapshot::BeaconSnapshot;

View File

@ -0,0 +1,378 @@
use crate::execution_payload::{validate_merge_block, AllowOptimisticImport};
use crate::{
BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ExecutionPayloadError,
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
};
use itertools::process_results;
use proto_array::InvalidationOperation;
use slog::{crit, debug, error, info, warn};
use slot_clock::SlotClock;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use state_processing::per_block_processing::is_merge_transition_complete;
use std::sync::Arc;
use store::{DBColumn, Error as StoreError, HotColdDB, KeyValueStore, StoreItem};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::time::sleep;
use tree_hash::TreeHash;
use types::{BeaconBlockRef, EthSpec, Hash256, Slot};
use DBColumn::OptimisticTransitionBlock as OTBColumn;
#[derive(Clone, Debug, Decode, Encode, PartialEq)]
pub struct OptimisticTransitionBlock {
root: Hash256,
slot: Slot,
}
impl OptimisticTransitionBlock {
// types::BeaconBlockRef<'_, <T as BeaconChainTypes>::EthSpec>
pub fn from_block<E: EthSpec>(block: BeaconBlockRef<E>) -> Self {
Self {
root: block.tree_hash_root(),
slot: block.slot(),
}
}
pub fn root(&self) -> &Hash256 {
&self.root
}
pub fn slot(&self) -> &Slot {
&self.slot
}
pub fn persist_in_store<T, A>(&self, store: A) -> Result<(), StoreError>
where
T: BeaconChainTypes,
A: AsRef<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
{
if store
.as_ref()
.item_exists::<OptimisticTransitionBlock>(&self.root)?
{
Ok(())
} else {
store.as_ref().put_item(&self.root, self)
}
}
pub fn remove_from_store<T, A>(&self, store: A) -> Result<(), StoreError>
where
T: BeaconChainTypes,
A: AsRef<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
{
store
.as_ref()
.hot_db
.key_delete(OTBColumn.into(), self.root.as_bytes())
}
fn is_canonical<T: BeaconChainTypes>(
&self,
chain: &BeaconChain<T>,
) -> Result<bool, BeaconChainError> {
Ok(chain
.forwards_iter_block_roots_until(self.slot, self.slot)?
.next()
.transpose()?
.map(|(root, _)| root)
== Some(self.root))
}
}
impl StoreItem for OptimisticTransitionBlock {
fn db_column() -> DBColumn {
OTBColumn
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}
/// The routine is expected to run once per epoch, 1/4th through the epoch.
pub const EPOCH_DELAY_FACTOR: u32 = 4;
/// Spawns a routine which checks the validity of any optimistically imported transition blocks
///
/// This routine will run once per epoch, at `epoch_duration / EPOCH_DELAY_FACTOR` after
/// the start of each epoch.
///
/// The service will not be started if there is no `execution_layer` on the `chain`.
pub fn start_otb_verification_service<T: BeaconChainTypes>(
executor: TaskExecutor,
chain: Arc<BeaconChain<T>>,
) {
// Avoid spawning the service if there's no EL, it'll just error anyway.
if chain.execution_layer.is_some() {
executor.spawn(
async move { otb_verification_service(chain).await },
"otb_verification_service",
);
}
}
pub fn load_optimistic_transition_blocks<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
) -> Result<Vec<OptimisticTransitionBlock>, StoreError> {
process_results(chain.store.hot_db.iter_column(OTBColumn), |iter| {
iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes))
.collect()
})?
}
#[derive(Debug)]
pub enum Error {
ForkChoice(String),
BeaconChain(BeaconChainError),
StoreError(StoreError),
NoBlockFound(OptimisticTransitionBlock),
}
pub async fn validate_optimistic_transition_blocks<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
otbs: Vec<OptimisticTransitionBlock>,
) -> Result<(), Error> {
let finalized_slot = chain
.canonical_head
.fork_choice_read_lock()
.get_finalized_block()
.map_err(|e| Error::ForkChoice(format!("{:?}", e)))?
.slot;
// separate otbs into
// non-canonical
// finalized canonical
// unfinalized canonical
let mut non_canonical_otbs = vec![];
let (finalized_canonical_otbs, unfinalized_canonical_otbs) = process_results(
otbs.into_iter().map(|otb| {
otb.is_canonical(chain)
.map(|is_canonical| (otb, is_canonical))
}),
|pair_iter| {
pair_iter
.filter_map(|(otb, is_canonical)| {
if is_canonical {
Some(otb)
} else {
non_canonical_otbs.push(otb);
None
}
})
.partition::<Vec<_>, _>(|otb| *otb.slot() <= finalized_slot)
},
)
.map_err(Error::BeaconChain)?;
// remove non-canonical blocks that conflict with finalized checkpoint from the database
for otb in non_canonical_otbs {
if *otb.slot() <= finalized_slot {
otb.remove_from_store::<T, _>(&chain.store)
.map_err(Error::StoreError)?;
}
}
// ensure finalized canonical otb are valid, otherwise kill client
for otb in finalized_canonical_otbs {
match chain.get_block(otb.root()).await {
Ok(Some(block)) => {
match validate_merge_block(chain, block.message(), AllowOptimisticImport::No).await
{
Ok(()) => {
// merge transition block is valid, remove it from OTB
otb.remove_from_store::<T, _>(&chain.store)
.map_err(Error::StoreError)?;
info!(
chain.log,
"Validated merge transition block";
"block_root" => ?otb.root(),
"type" => "finalized"
);
}
// The block was not able to be verified by the EL. Leave the OTB in the
// database since the EL is likely still syncing and may verify the block
// later.
Err(BlockError::ExecutionPayloadError(
ExecutionPayloadError::UnverifiedNonOptimisticCandidate,
)) => (),
Err(BlockError::ExecutionPayloadError(
ExecutionPayloadError::InvalidTerminalPoWBlock { .. },
)) => {
// Finalized Merge Transition Block is Invalid! Kill the Client!
crit!(
chain.log,
"Finalized merge transition block is invalid!";
"msg" => "You must use the `--purge-db` flag to clear the database and restart sync. \
You may be on a hostile network.",
"block_hash" => ?block.canonical_root()
);
let mut shutdown_sender = chain.shutdown_sender();
if let Err(e) = shutdown_sender.try_send(ShutdownReason::Failure(
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
)) {
crit!(
chain.log,
"Failed to shut down client";
"error" => ?e,
"shutdown_reason" => INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON
);
}
}
_ => {}
}
}
Ok(None) => return Err(Error::NoBlockFound(otb)),
// Our database has pruned the payload and the payload was unavailable on the EL since
// the EL is still syncing or the payload is non-canonical.
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => (),
Err(e) => return Err(Error::BeaconChain(e)),
}
}
// attempt to validate any non-finalized canonical otb blocks
for otb in unfinalized_canonical_otbs {
match chain.get_block(otb.root()).await {
Ok(Some(block)) => {
match validate_merge_block(chain, block.message(), AllowOptimisticImport::No).await
{
Ok(()) => {
// merge transition block is valid, remove it from OTB
otb.remove_from_store::<T, _>(&chain.store)
.map_err(Error::StoreError)?;
info!(
chain.log,
"Validated merge transition block";
"block_root" => ?otb.root(),
"type" => "not finalized"
);
}
// The block was not able to be verified by the EL. Leave the OTB in the
// database since the EL is likely still syncing and may verify the block
// later.
Err(BlockError::ExecutionPayloadError(
ExecutionPayloadError::UnverifiedNonOptimisticCandidate,
)) => (),
Err(BlockError::ExecutionPayloadError(
ExecutionPayloadError::InvalidTerminalPoWBlock { .. },
)) => {
// Unfinalized Merge Transition Block is Invalid -> Run process_invalid_execution_payload
warn!(
chain.log,
"Merge transition block invalid";
"block_root" => ?otb.root()
);
chain
.process_invalid_execution_payload(
&InvalidationOperation::InvalidateOne {
block_root: *otb.root(),
},
)
.await
.map_err(|e| {
warn!(
chain.log,
"Error checking merge transition block";
"error" => ?e,
"location" => "process_invalid_execution_payload"
);
Error::BeaconChain(e)
})?;
}
_ => {}
}
}
Ok(None) => return Err(Error::NoBlockFound(otb)),
// Our database has pruned the payload and the payload was unavailable on the EL since
// the EL is still syncing or the payload is non-canonical.
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => (),
Err(e) => return Err(Error::BeaconChain(e)),
}
}
Ok(())
}
/// Loop until any optimistically imported merge transition blocks have been verified and
/// the merge has been finalized.
async fn otb_verification_service<T: BeaconChainTypes>(chain: Arc<BeaconChain<T>>) {
let epoch_duration = chain.slot_clock.slot_duration() * T::EthSpec::slots_per_epoch() as u32;
loop {
match chain
.slot_clock
.duration_to_next_epoch(T::EthSpec::slots_per_epoch())
{
Some(duration) => {
let additional_delay = epoch_duration / EPOCH_DELAY_FACTOR;
sleep(duration + additional_delay).await;
debug!(
chain.log,
"OTB verification service firing";
);
if !is_merge_transition_complete(
&chain.canonical_head.cached_head().snapshot.beacon_state,
) {
// We are pre-merge. Nothing to do yet.
continue;
}
// load all optimistically imported transition blocks from the database
match load_optimistic_transition_blocks(chain.as_ref()) {
Ok(otbs) => {
if otbs.is_empty() {
if chain
.canonical_head
.fork_choice_read_lock()
.get_finalized_block()
.map_or(false, |block| {
block.execution_status.is_execution_enabled()
})
{
// there are no optimistic blocks in the database, we can exit
// the service since the merge transition is finalized and we'll
// never see another transition block
break;
} else {
debug!(
chain.log,
"No optimistic transition blocks";
"info" => "waiting for the merge transition to finalize"
)
}
}
if let Err(e) = validate_optimistic_transition_blocks(&chain, otbs).await {
warn!(
chain.log,
"Error while validating optimistic transition blocks";
"error" => ?e
);
}
}
Err(e) => {
error!(
chain.log,
"Error loading optimistic transition blocks";
"error" => ?e
);
}
};
}
None => {
error!(chain.log, "Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(chain.slot_clock.slot_duration()).await;
}
};
}
debug!(
chain.log,
"No optimistic transition blocks in database";
"msg" => "shutting down OTB verification service"
);
}

View File

@ -1,13 +1,19 @@
#![cfg(not(debug_assertions))] #![cfg(not(debug_assertions))]
use beacon_chain::otb_verification_service::{
load_optimistic_transition_blocks, validate_optimistic_transition_blocks,
OptimisticTransitionBlock,
};
use beacon_chain::{ use beacon_chain::{
canonical_head::{CachedHead, CanonicalHead}, canonical_head::{CachedHead, CanonicalHead},
test_utils::{BeaconChainHarness, EphemeralHarnessType}, test_utils::{BeaconChainHarness, EphemeralHarnessType},
BeaconChainError, BlockError, ExecutionPayloadError, StateSkipConfig, WhenSlotSkipped, BeaconChainError, BlockError, ExecutionPayloadError, StateSkipConfig, WhenSlotSkipped,
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON,
}; };
use execution_layer::{ use execution_layer::{
json_structures::{JsonForkChoiceStateV1, JsonPayloadAttributesV1}, json_structures::{JsonForkChoiceStateV1, JsonPayloadAttributesV1},
test_utils::ExecutionBlockGenerator,
ExecutionLayer, ForkChoiceState, PayloadAttributes, ExecutionLayer, ForkChoiceState, PayloadAttributes,
}; };
use fork_choice::{ use fork_choice::{
@ -44,7 +50,11 @@ struct InvalidPayloadRig {
impl InvalidPayloadRig { impl InvalidPayloadRig {
fn new() -> Self { fn new() -> Self {
let mut spec = E::default_spec(); let spec = E::default_spec();
Self::new_with_spec(spec)
}
fn new_with_spec(mut spec: ChainSpec) -> Self {
spec.altair_fork_epoch = Some(Epoch::new(0)); spec.altair_fork_epoch = Some(Epoch::new(0));
spec.bellatrix_fork_epoch = Some(Epoch::new(0)); spec.bellatrix_fork_epoch = Some(Epoch::new(0));
@ -1203,6 +1213,548 @@ async fn attesting_to_optimistic_head() {
get_aggregated_by_slot_and_root().unwrap(); get_aggregated_by_slot_and_root().unwrap();
} }
/// A helper struct to build out a chain of some configurable length which undergoes the merge
/// transition.
struct OptimisticTransitionSetup {
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
execution_block_generator: ExecutionBlockGenerator<E>,
}
impl OptimisticTransitionSetup {
async fn new(num_blocks: usize, ttd: u64) -> Self {
let mut spec = E::default_spec();
spec.terminal_total_difficulty = ttd.into();
let mut rig = InvalidPayloadRig::new_with_spec(spec).enable_attestations();
rig.move_to_terminal_block();
let mut blocks = Vec::with_capacity(num_blocks);
for _ in 0..num_blocks {
let root = rig.import_block(Payload::Valid).await;
let block = rig.harness.chain.get_block(&root).await.unwrap().unwrap();
blocks.push(Arc::new(block));
}
let execution_block_generator = rig
.harness
.mock_execution_layer
.as_ref()
.unwrap()
.server
.execution_block_generator()
.clone();
Self {
blocks,
execution_block_generator,
}
}
}
/// Build a chain which has optimistically imported a transition block.
///
/// The initial chain will be built with respect to `block_ttd`, whilst the `rig` which imports the
/// chain will operate with respect to `rig_ttd`. This allows for testing mismatched TTDs.
async fn build_optimistic_chain(
block_ttd: u64,
rig_ttd: u64,
num_blocks: usize,
) -> InvalidPayloadRig {
let OptimisticTransitionSetup {
blocks,
execution_block_generator,
} = OptimisticTransitionSetup::new(num_blocks, block_ttd).await;
// Build a brand-new testing harness. We will apply the blocks from the previous harness to
// this one.
let mut spec = E::default_spec();
spec.terminal_total_difficulty = rig_ttd.into();
let rig = InvalidPayloadRig::new_with_spec(spec);
let spec = &rig.harness.chain.spec;
let mock_execution_layer = rig.harness.mock_execution_layer.as_ref().unwrap();
// Ensure all the execution blocks from the first rig are available in the second rig.
*mock_execution_layer.server.execution_block_generator() = execution_block_generator;
// Make the execution layer respond `SYNCING` to all `newPayload` requests.
mock_execution_layer
.server
.all_payloads_syncing_on_new_payload(true);
// Make the execution layer respond `SYNCING` to all `forkchoiceUpdated` requests.
mock_execution_layer
.server
.all_payloads_syncing_on_forkchoice_updated();
// Make the execution layer respond `None` to all `getBlockByHash` requests.
mock_execution_layer
.server
.all_get_block_by_hash_requests_return_none();
let current_slot = std::cmp::max(
blocks[0].slot() + spec.safe_slots_to_import_optimistically,
num_blocks.into(),
);
rig.harness.set_current_slot(current_slot);
for block in blocks {
rig.harness
.chain
.process_block(block, CountUnrealized::True)
.await
.unwrap();
}
rig.harness.chain.recompute_head_at_current_slot().await;
// Make the execution layer respond normally to `getBlockByHash` requests.
mock_execution_layer
.server
.all_get_block_by_hash_requests_return_natural_value();
// Perform some sanity checks to ensure that the transition happened exactly where we expected.
let pre_transition_block_root = rig
.harness
.chain
.block_root_at_slot(Slot::new(0), WhenSlotSkipped::None)
.unwrap()
.unwrap();
let pre_transition_block = rig
.harness
.chain
.get_block(&pre_transition_block_root)
.await
.unwrap()
.unwrap();
let post_transition_block_root = rig
.harness
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap()
.unwrap();
let post_transition_block = rig
.harness
.chain
.get_block(&post_transition_block_root)
.await
.unwrap()
.unwrap();
assert_eq!(
pre_transition_block_root,
post_transition_block.parent_root(),
"the blocks form a single chain"
);
assert!(
pre_transition_block
.message()
.body()
.execution_payload()
.unwrap()
.execution_payload
== <_>::default(),
"the block *has not* undergone the merge transition"
);
assert!(
post_transition_block
.message()
.body()
.execution_payload()
.unwrap()
.execution_payload
!= <_>::default(),
"the block *has* undergone the merge transition"
);
// Assert that the transition block was optimistically imported.
//
// Note: we're using the "fallback" check for optimistic status, so if the block was
// pre-finality then we'll just use the optimistic status of the finalized block.
assert!(
rig.harness
.chain
.canonical_head
.fork_choice_read_lock()
.is_optimistic_block(&post_transition_block_root)
.unwrap(),
"the transition block should be imported optimistically"
);
// Get the mock execution layer to respond to `getBlockByHash` requests normally again.
mock_execution_layer
.server
.all_get_block_by_hash_requests_return_natural_value();
return rig;
}
#[tokio::test]
async fn optimistic_transition_block_valid_unfinalized() {
let ttd = 42;
let num_blocks = 16 as usize;
let rig = build_optimistic_chain(ttd, ttd, num_blocks).await;
let post_transition_block_root = rig
.harness
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap()
.unwrap();
let post_transition_block = rig
.harness
.chain
.get_block(&post_transition_block_root)
.await
.unwrap()
.unwrap();
assert!(
rig.cached_head()
.finalized_checkpoint()
.epoch
.start_slot(E::slots_per_epoch())
< post_transition_block.slot(),
"the transition block should not be finalized"
);
let otbs = load_optimistic_transition_blocks(&rig.harness.chain)
.expect("should load optimistic transition block from db");
assert_eq!(
otbs.len(),
1,
"There should be one optimistic transition block"
);
let valid_otb = OptimisticTransitionBlock::from_block(post_transition_block.message());
assert_eq!(
valid_otb, otbs[0],
"The optimistic transition block stored in the database should be what we expect",
);
validate_optimistic_transition_blocks(&rig.harness.chain, otbs)
.await
.expect("should validate fine");
// now that the transition block has been validated, it should have been removed from the database
let otbs = load_optimistic_transition_blocks(&rig.harness.chain)
.expect("should load optimistic transition block from db");
assert!(
otbs.is_empty(),
"The valid optimistic transition block should have been removed from the database",
);
}
#[tokio::test]
async fn optimistic_transition_block_valid_finalized() {
let ttd = 42;
let num_blocks = 130 as usize;
let rig = build_optimistic_chain(ttd, ttd, num_blocks).await;
let post_transition_block_root = rig
.harness
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap()
.unwrap();
let post_transition_block = rig
.harness
.chain
.get_block(&post_transition_block_root)
.await
.unwrap()
.unwrap();
assert!(
rig.cached_head()
.finalized_checkpoint()
.epoch
.start_slot(E::slots_per_epoch())
> post_transition_block.slot(),
"the transition block should be finalized"
);
let otbs = load_optimistic_transition_blocks(&rig.harness.chain)
.expect("should load optimistic transition block from db");
assert_eq!(
otbs.len(),
1,
"There should be one optimistic transition block"
);
let valid_otb = OptimisticTransitionBlock::from_block(post_transition_block.message());
assert_eq!(
valid_otb, otbs[0],
"The optimistic transition block stored in the database should be what we expect",
);
validate_optimistic_transition_blocks(&rig.harness.chain, otbs)
.await
.expect("should validate fine");
// now that the transition block has been validated, it should have been removed from the database
let otbs = load_optimistic_transition_blocks(&rig.harness.chain)
.expect("should load optimistic transition block from db");
assert!(
otbs.is_empty(),
"The valid optimistic transition block should have been removed from the database",
);
}
#[tokio::test]
async fn optimistic_transition_block_invalid_unfinalized() {
let block_ttd = 42;
let rig_ttd = 1337;
let num_blocks = 22 as usize;
let rig = build_optimistic_chain(block_ttd, rig_ttd, num_blocks).await;
let post_transition_block_root = rig
.harness
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap()
.unwrap();
let post_transition_block = rig
.harness
.chain
.get_block(&post_transition_block_root)
.await
.unwrap()
.unwrap();
assert!(
rig.cached_head()
.finalized_checkpoint()
.epoch
.start_slot(E::slots_per_epoch())
< post_transition_block.slot(),
"the transition block should not be finalized"
);
let otbs = load_optimistic_transition_blocks(&rig.harness.chain)
.expect("should load optimistic transition block from db");
assert_eq!(
otbs.len(),
1,
"There should be one optimistic transition block"
);
let invalid_otb = OptimisticTransitionBlock::from_block(post_transition_block.message());
assert_eq!(
invalid_otb, otbs[0],
"The optimistic transition block stored in the database should be what we expect",
);
// No shutdown should've been triggered.
assert_eq!(rig.harness.shutdown_reasons(), vec![]);
// It shouldn't be known as invalid yet
assert!(!rig
.execution_status(post_transition_block_root)
.is_invalid());
validate_optimistic_transition_blocks(&rig.harness.chain, otbs)
.await
.unwrap();
// Still no shutdown should've been triggered.
assert_eq!(rig.harness.shutdown_reasons(), vec![]);
// It should be marked invalid now
assert!(rig
.execution_status(post_transition_block_root)
.is_invalid());
// the invalid merge transition block should NOT have been removed from the database
let otbs = load_optimistic_transition_blocks(&rig.harness.chain)
.expect("should load optimistic transition block from db");
assert_eq!(
otbs.len(),
1,
"The invalid merge transition block should still be in the database",
);
assert_eq!(
invalid_otb, otbs[0],
"The optimistic transition block stored in the database should be what we expect",
);
}
#[tokio::test]
async fn optimistic_transition_block_invalid_unfinalized_syncing_ee() {
let block_ttd = 42;
let rig_ttd = 1337;
let num_blocks = 22 as usize;
let rig = build_optimistic_chain(block_ttd, rig_ttd, num_blocks).await;
let post_transition_block_root = rig
.harness
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap()
.unwrap();
let post_transition_block = rig
.harness
.chain
.get_block(&post_transition_block_root)
.await
.unwrap()
.unwrap();
assert!(
rig.cached_head()
.finalized_checkpoint()
.epoch
.start_slot(E::slots_per_epoch())
< post_transition_block.slot(),
"the transition block should not be finalized"
);
let otbs = load_optimistic_transition_blocks(&rig.harness.chain)
.expect("should load optimistic transition block from db");
assert_eq!(
otbs.len(),
1,
"There should be one optimistic transition block"
);
let invalid_otb = OptimisticTransitionBlock::from_block(post_transition_block.message());
assert_eq!(
invalid_otb, otbs[0],
"The optimistic transition block stored in the database should be what we expect",
);
// No shutdown should've been triggered.
assert_eq!(rig.harness.shutdown_reasons(), vec![]);
// It shouldn't be known as invalid yet
assert!(!rig
.execution_status(post_transition_block_root)
.is_invalid());
// Make the execution layer respond `None` to all `getBlockByHash` requests to simulate a
// syncing EE.
let mock_execution_layer = rig.harness.mock_execution_layer.as_ref().unwrap();
mock_execution_layer
.server
.all_get_block_by_hash_requests_return_none();
validate_optimistic_transition_blocks(&rig.harness.chain, otbs)
.await
.unwrap();
// Still no shutdown should've been triggered.
assert_eq!(rig.harness.shutdown_reasons(), vec![]);
// It should still be marked as optimistic.
assert!(rig
.execution_status(post_transition_block_root)
.is_optimistic());
// the optimistic merge transition block should NOT have been removed from the database
let otbs = load_optimistic_transition_blocks(&rig.harness.chain)
.expect("should load optimistic transition block from db");
assert_eq!(
otbs.len(),
1,
"The optimistic merge transition block should still be in the database",
);
assert_eq!(
invalid_otb, otbs[0],
"The optimistic transition block stored in the database should be what we expect",
);
// Allow the EL to respond to `getBlockByHash`, as if it has finished syncing.
mock_execution_layer
.server
.all_get_block_by_hash_requests_return_natural_value();
validate_optimistic_transition_blocks(&rig.harness.chain, otbs)
.await
.unwrap();
// Still no shutdown should've been triggered.
assert_eq!(rig.harness.shutdown_reasons(), vec![]);
// It should be marked invalid now
assert!(rig
.execution_status(post_transition_block_root)
.is_invalid());
// the invalid merge transition block should NOT have been removed from the database
let otbs = load_optimistic_transition_blocks(&rig.harness.chain)
.expect("should load optimistic transition block from db");
assert_eq!(
otbs.len(),
1,
"The invalid merge transition block should still be in the database",
);
assert_eq!(
invalid_otb, otbs[0],
"The optimistic transition block stored in the database should be what we expect",
);
}
#[tokio::test]
async fn optimistic_transition_block_invalid_finalized() {
let block_ttd = 42;
let rig_ttd = 1337;
let num_blocks = 130 as usize;
let rig = build_optimistic_chain(block_ttd, rig_ttd, num_blocks).await;
let post_transition_block_root = rig
.harness
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap()
.unwrap();
let post_transition_block = rig
.harness
.chain
.get_block(&post_transition_block_root)
.await
.unwrap()
.unwrap();
assert!(
rig.cached_head()
.finalized_checkpoint()
.epoch
.start_slot(E::slots_per_epoch())
> post_transition_block.slot(),
"the transition block should be finalized"
);
let otbs = load_optimistic_transition_blocks(&rig.harness.chain)
.expect("should load optimistic transition block from db");
assert_eq!(
otbs.len(),
1,
"There should be one optimistic transition block"
);
let invalid_otb = OptimisticTransitionBlock::from_block(post_transition_block.message());
assert_eq!(
invalid_otb, otbs[0],
"The optimistic transition block stored in the database should be what we expect",
);
// No shutdown should've been triggered yet.
assert_eq!(rig.harness.shutdown_reasons(), vec![]);
validate_optimistic_transition_blocks(&rig.harness.chain, otbs)
.await
.expect("should invalidate merge transition block and shutdown the client");
// The beacon chain should have triggered a shutdown.
assert_eq!(
rig.harness.shutdown_reasons(),
vec![ShutdownReason::Failure(
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON
)]
);
// the invalid merge transition block should NOT have been removed from the database
let otbs = load_optimistic_transition_blocks(&rig.harness.chain)
.expect("should load optimistic transition block from db");
assert_eq!(
otbs.len(),
1,
"The invalid merge transition block should still be in the database",
);
assert_eq!(
invalid_otb, otbs[0],
"The optimistic transition block stored in the database should be what we expect",
);
}
/// Helper for running tests where we generate a chain with an invalid head and then some /// Helper for running tests where we generate a chain with an invalid head and then some
/// `fork_blocks` to recover it. /// `fork_blocks` to recover it.
struct InvalidHeadSetup { struct InvalidHeadSetup {

View File

@ -1,6 +1,7 @@
use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::config::{ClientGenesis, Config as ClientConfig};
use crate::notifier::spawn_notifier; use crate::notifier::spawn_notifier;
use crate::Client; use crate::Client;
use beacon_chain::otb_verification_service::start_otb_verification_service;
use beacon_chain::proposer_prep_service::start_proposer_prep_service; use beacon_chain::proposer_prep_service::start_proposer_prep_service;
use beacon_chain::schema_change::migrate_schema; use beacon_chain::schema_change::migrate_schema;
use beacon_chain::{ use beacon_chain::{
@ -728,6 +729,7 @@ where
} }
start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone()); start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone());
start_otb_verification_service(runtime_context.executor.clone(), beacon_chain.clone());
} }
Ok(Client { Ok(Client {

View File

@ -105,6 +105,7 @@ pub struct PoWBlock {
pub timestamp: u64, pub timestamp: u64,
} }
#[derive(Clone)]
pub struct ExecutionBlockGenerator<T: EthSpec> { pub struct ExecutionBlockGenerator<T: EthSpec> {
/* /*
* Common database * Common database

View File

@ -48,6 +48,12 @@ pub async fn handle_rpc<T: EthSpec>(
s.parse() s.parse()
.map_err(|e| format!("unable to parse hash: {:?}", e)) .map_err(|e| format!("unable to parse hash: {:?}", e))
})?; })?;
// If we have a static response set, just return that.
if let Some(response) = *ctx.static_get_block_by_hash_response.lock() {
return Ok(serde_json::to_value(response).unwrap());
}
let full_tx = params let full_tx = params
.get(1) .get(1)
.and_then(JsonValue::as_bool) .and_then(JsonValue::as_bool)

View File

@ -2,7 +2,7 @@
use crate::engine_api::auth::JwtKey; use crate::engine_api::auth::JwtKey;
use crate::engine_api::{ use crate::engine_api::{
auth::Auth, http::JSONRPC_VERSION, PayloadStatusV1, PayloadStatusV1Status, auth::Auth, http::JSONRPC_VERSION, ExecutionBlock, PayloadStatusV1, PayloadStatusV1Status,
}; };
use bytes::Bytes; use bytes::Bytes;
use environment::null_logger; use environment::null_logger;
@ -96,6 +96,7 @@ impl<T: EthSpec> MockServer<T> {
preloaded_responses, preloaded_responses,
static_new_payload_response: <_>::default(), static_new_payload_response: <_>::default(),
static_forkchoice_updated_response: <_>::default(), static_forkchoice_updated_response: <_>::default(),
static_get_block_by_hash_response: <_>::default(),
_phantom: PhantomData, _phantom: PhantomData,
}); });
@ -317,6 +318,16 @@ impl<T: EthSpec> MockServer<T> {
self.set_forkchoice_updated_response(Self::invalid_terminal_block_status()); self.set_forkchoice_updated_response(Self::invalid_terminal_block_status());
} }
/// This will make the node appear like it is syncing.
pub fn all_get_block_by_hash_requests_return_none(&self) {
*self.ctx.static_get_block_by_hash_response.lock() = Some(None);
}
/// The node will respond "naturally"; it will return blocks if they're known to it.
pub fn all_get_block_by_hash_requests_return_natural_value(&self) {
*self.ctx.static_get_block_by_hash_response.lock() = None;
}
/// Disables any static payload responses so the execution block generator will do its own /// Disables any static payload responses so the execution block generator will do its own
/// verification. /// verification.
pub fn full_payload_verification(&self) { pub fn full_payload_verification(&self) {
@ -406,6 +417,7 @@ pub struct Context<T: EthSpec> {
pub previous_request: Arc<Mutex<Option<serde_json::Value>>>, pub previous_request: Arc<Mutex<Option<serde_json::Value>>>,
pub static_new_payload_response: Arc<Mutex<Option<StaticNewPayloadResponse>>>, pub static_new_payload_response: Arc<Mutex<Option<StaticNewPayloadResponse>>>,
pub static_forkchoice_updated_response: Arc<Mutex<Option<PayloadStatusV1>>>, pub static_forkchoice_updated_response: Arc<Mutex<Option<PayloadStatusV1>>>,
pub static_get_block_by_hash_response: Arc<Mutex<Option<Option<ExecutionBlock>>>>,
pub _phantom: PhantomData<T>, pub _phantom: PhantomData<T>,
} }

View File

@ -208,6 +208,9 @@ pub enum DBColumn {
BeaconRandaoMixes, BeaconRandaoMixes,
#[strum(serialize = "dht")] #[strum(serialize = "dht")]
DhtEnrs, DhtEnrs,
/// For Optimistically Imported Merge Transition Blocks
#[strum(serialize = "otb")]
OptimisticTransitionBlock,
} }
/// A block from the database, which might have an execution payload or not. /// A block from the database, which might have an execution payload or not.

View File

@ -1,14 +1,17 @@
use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp}; use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp};
use crate::{ColumnIter, DBColumn};
use parking_lot::{Mutex, MutexGuard, RwLock}; use parking_lot::{Mutex, MutexGuard, RwLock};
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::marker::PhantomData; use std::marker::PhantomData;
use types::*; use types::*;
type DBHashMap = HashMap<Vec<u8>, Vec<u8>>; type DBHashMap = HashMap<Vec<u8>, Vec<u8>>;
type DBKeyMap = HashMap<Vec<u8>, HashSet<Vec<u8>>>;
/// A thread-safe `HashMap` wrapper. /// A thread-safe `HashMap` wrapper.
pub struct MemoryStore<E: EthSpec> { pub struct MemoryStore<E: EthSpec> {
db: RwLock<DBHashMap>, db: RwLock<DBHashMap>,
col_keys: RwLock<DBKeyMap>,
transaction_mutex: Mutex<()>, transaction_mutex: Mutex<()>,
_phantom: PhantomData<E>, _phantom: PhantomData<E>,
} }
@ -18,6 +21,7 @@ impl<E: EthSpec> MemoryStore<E> {
pub fn open() -> Self { pub fn open() -> Self {
Self { Self {
db: RwLock::new(HashMap::new()), db: RwLock::new(HashMap::new()),
col_keys: RwLock::new(HashMap::new()),
transaction_mutex: Mutex::new(()), transaction_mutex: Mutex::new(()),
_phantom: PhantomData, _phantom: PhantomData,
} }
@ -41,6 +45,11 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
let column_key = Self::get_key_for_col(col, key); let column_key = Self::get_key_for_col(col, key);
self.db.write().insert(column_key, val.to_vec()); self.db.write().insert(column_key, val.to_vec());
self.col_keys
.write()
.entry(col.as_bytes().to_vec())
.or_insert_with(HashSet::new)
.insert(key.to_vec());
Ok(()) Ok(())
} }
@ -63,6 +72,10 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> { fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> {
let column_key = Self::get_key_for_col(col, key); let column_key = Self::get_key_for_col(col, key);
self.db.write().remove(&column_key); self.db.write().remove(&column_key);
self.col_keys
.write()
.get_mut(&col.as_bytes().to_vec())
.map(|set| set.remove(key));
Ok(()) Ok(())
} }
@ -81,6 +94,26 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
Ok(()) Ok(())
} }
// pub type ColumnIter<'a> = Box<dyn Iterator<Item = Result<(Hash256, Vec<u8>), Error>> + 'a>;
fn iter_column(&self, column: DBColumn) -> ColumnIter {
let col = column.as_str();
if let Some(keys) = self
.col_keys
.read()
.get(col.as_bytes())
.map(|set| set.iter().cloned().collect::<Vec<_>>())
{
Box::new(keys.into_iter().filter_map(move |key| {
let hash = Hash256::from_slice(&key);
self.get_bytes(col, &key)
.transpose()
.map(|res| res.map(|bytes| (hash, bytes)))
}))
} else {
Box::new(std::iter::empty())
}
}
fn begin_rw_transaction(&self) -> MutexGuard<()> { fn begin_rw_transaction(&self) -> MutexGuard<()> {
self.transaction_mutex.lock() self.transaction_mutex.lock()
} }