Reorg events (#2090)

## Issue Addressed

Resolves #2088

## Proposed Changes

Add the `chain_reorg` SSE event topic

## Additional Info


Co-authored-by: realbigsean <seananderson33@gmail.com>
Co-authored-by: Paul Hauner <paul@paulhauner.com>
This commit is contained in:
realbigsean 2021-06-17 02:10:46 +00:00
parent 3261eff0bf
commit b1657a60e9
10 changed files with 412 additions and 9 deletions

View File

@ -32,7 +32,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::BeaconForkChoiceStore;
use crate::BeaconSnapshot;
use crate::{metrics, BeaconChainError};
use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead};
use eth2::types::{EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead};
use fork_choice::ForkChoice;
use futures::channel::mpsc::Sender;
use itertools::process_results;
@ -455,6 +455,77 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(|result| result.map_err(|e| e.into())))
}
/// Iterate through the current chain to find the slot intersecting with the given beacon state.
/// The maximum depth this will search is `SLOTS_PER_HISTORICAL_ROOT`, and if that depth is reached
/// and no intersection is found, the finalized slot will be returned.
pub fn find_reorg_slot(
&self,
new_state: &BeaconState<T::EthSpec>,
new_block_root: Hash256,
) -> Result<Slot, Error> {
self.with_head(|snapshot| {
let old_state = &snapshot.beacon_state;
let old_block_root = snapshot.beacon_block_root;
// The earliest slot for which the two chains may have a common history.
let lowest_slot = std::cmp::min(new_state.slot, old_state.slot);
// Create an iterator across `$state`, assuming that the block at `$state.slot` has the
// block root of `$block_root`.
//
// The iterator will be skipped until the next value returns `lowest_slot`.
//
// This is a macro instead of a function or closure due to the complex types invloved
// in all the iterator wrapping.
macro_rules! aligned_roots_iter {
($state: ident, $block_root: ident) => {
std::iter::once(Ok(($state.slot, $block_root)))
.chain($state.rev_iter_block_roots(&self.spec))
.skip_while(|result| {
result
.as_ref()
.map_or(false, |(slot, _)| *slot > lowest_slot)
})
};
}
// Create iterators across old/new roots where iterators both start at the same slot.
let mut new_roots = aligned_roots_iter!(new_state, new_block_root);
let mut old_roots = aligned_roots_iter!(old_state, old_block_root);
// Whilst *both* of the iterators are still returning values, try and find a common
// ancestor between them.
while let (Some(old), Some(new)) = (old_roots.next(), new_roots.next()) {
let (old_slot, old_root) = old?;
let (new_slot, new_root) = new?;
// Sanity check to detect programming errors.
if old_slot != new_slot {
return Err(Error::InvalidReorgSlotIter { new_slot, old_slot });
}
if old_root == new_root {
// A common ancestor has been found.
return Ok(old_slot);
}
}
// If no common ancestor is found, declare that the re-org happened at the previous
// finalized slot.
//
// Sometimes this will result in the return slot being *lower* than the actual reorg
// slot. However, assuming we don't re-org through a finalized slot, it will never be
// *higher*.
//
// We provide this potentially-inaccurate-but-safe information to avoid onerous
// database reads during times of deep reorgs.
Ok(old_state
.finalized_checkpoint
.epoch
.start_slot(T::EthSpec::slots_per_epoch()))
})
}
/// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to
/// the earliest reachable ancestor (may or may not be genesis).
///
@ -2270,14 +2341,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Note: this will declare a re-org if we skip `SLOTS_PER_HISTORICAL_ROOT` blocks
// between calls to fork choice without swapping between chains. This seems like an
// extreme-enough scenario that a warning is fine.
let is_reorg = current_head.block_root
!= new_head
.beacon_state
.get_block_root(current_head.slot)
.map(|root| *root)
.unwrap_or_else(|_| Hash256::random());
let is_reorg = new_head
.beacon_state
.get_block_root(current_head.slot)
.map_or(true, |root| *root != current_head.block_root);
let mut reorg_distance = Slot::new(0);
if is_reorg {
match self.find_reorg_slot(&new_head.beacon_state, new_head.beacon_block_root) {
Ok(slot) => reorg_distance = current_head.slot.saturating_sub(slot),
Err(e) => {
warn!(
self.log,
"Could not find re-org depth";
"error" => format!("{:?}", e),
);
}
}
metrics::inc_counter(&metrics::FORK_CHOICE_REORG_COUNT);
warn!(
self.log,
@ -2287,6 +2369,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"new_head_parent" => %new_head.beacon_block.parent_root(),
"new_head" => %beacon_block_root,
"new_slot" => new_head.beacon_block.slot(),
"reorg_distance" => reorg_distance,
);
} else {
debug!(
@ -2452,6 +2535,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}
}
if is_reorg && event_handler.has_reorg_subscribers() {
event_handler.register(EventKind::ChainReorg(SseChainReorg {
slot: head_slot,
depth: reorg_distance.as_u64(),
old_head_block: current_head.block_root,
old_head_state: current_head.state_root,
new_head_block: beacon_block_root,
new_head_state: state_root,
epoch: head_slot.epoch(T::EthSpec::slots_per_epoch()),
}));
}
}
Ok(())

View File

@ -117,6 +117,10 @@ pub enum BeaconChainError {
request_slot: Slot,
slot: Slot,
},
InvalidReorgSlotIter {
old_slot: Slot,
new_slot: Slot,
},
}
easy_from_to!(SlotProcessingError, BeaconChainError);

View File

@ -12,6 +12,7 @@ pub struct ServerSentEventHandler<T: EthSpec> {
finalized_tx: Sender<EventKind<T>>,
head_tx: Sender<EventKind<T>>,
exit_tx: Sender<EventKind<T>>,
chain_reorg: Sender<EventKind<T>>,
log: Logger,
}
@ -22,6 +23,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
let (finalized_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (head_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (exit_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (chain_reorg, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
Self {
attestation_tx,
@ -29,6 +31,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
finalized_tx,
head_tx,
exit_tx,
chain_reorg,
log,
}
}
@ -39,6 +42,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
let (finalized_tx, _) = broadcast::channel(capacity);
let (head_tx, _) = broadcast::channel(capacity);
let (exit_tx, _) = broadcast::channel(capacity);
let (chain_reorg, _) = broadcast::channel(capacity);
Self {
attestation_tx,
@ -46,6 +50,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
finalized_tx,
head_tx,
exit_tx,
chain_reorg,
log,
}
}
@ -65,6 +70,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
.map(|count| trace!(self.log, "Registering server-sent head event"; "receiver_count" => count)),
EventKind::VoluntaryExit(exit) => self.exit_tx.send(EventKind::VoluntaryExit(exit))
.map(|count| trace!(self.log, "Registering server-sent voluntary exit event"; "receiver_count" => count)),
EventKind::ChainReorg(reorg) => self.chain_reorg.send(EventKind::ChainReorg(reorg))
.map(|count| trace!(self.log, "Registering server-sent chain reorg event"; "receiver_count" => count)),
};
if let Err(SendError(event)) = result {
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
@ -91,6 +98,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.exit_tx.subscribe()
}
pub fn subscribe_reorgs(&self) -> Receiver<EventKind<T>> {
self.chain_reorg.subscribe()
}
pub fn has_attestation_subscribers(&self) -> bool {
self.attestation_tx.receiver_count() > 0
}
@ -110,4 +121,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
pub fn has_exit_subscribers(&self) -> bool {
self.exit_tx.receiver_count() > 0
}
pub fn has_reorg_subscribers(&self) -> bool {
self.chain_reorg.receiver_count() > 0
}
}

View File

@ -9,7 +9,7 @@ use beacon_chain::{
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
OP_POOL_DB_KEY,
},
WhenSlotSkipped,
StateSkipConfig, WhenSlotSkipped,
};
use operation_pool::PersistedOperationPool;
use state_processing::{
@ -139,6 +139,71 @@ fn iterators() {
);
}
#[test]
fn find_reorgs() {
let num_blocks_produced = MinimalEthSpec::slots_per_historical_root() + 1;
let harness = get_harness(VALIDATOR_COUNT);
harness.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
// No need to produce attestations for this test.
AttestationStrategy::SomeValidators(vec![]),
);
let head_state = harness.chain.head_beacon_state().unwrap();
let head_slot = head_state.slot;
let genesis_state = harness
.chain
.state_at_slot(Slot::new(0), StateSkipConfig::WithStateRoots)
.unwrap();
// because genesis is more than `SLOTS_PER_HISTORICAL_ROOT` away, this should return with the
// finalized slot.
assert_eq!(
harness
.chain
.find_reorg_slot(&genesis_state, harness.chain.genesis_block_root)
.unwrap(),
head_state
.finalized_checkpoint
.epoch
.start_slot(MinimalEthSpec::slots_per_epoch())
);
// test head
assert_eq!(
harness
.chain
.find_reorg_slot(
&head_state,
harness.chain.head_beacon_block().unwrap().canonical_root()
)
.unwrap(),
head_slot
);
// Re-org back to the slot prior to the head.
let prev_slot = head_slot - Slot::new(1);
let prev_state = harness
.chain
.state_at_slot(prev_slot, StateSkipConfig::WithStateRoots)
.unwrap();
let prev_block_root = harness
.chain
.block_root_at_slot(prev_slot, WhenSlotSkipped::None)
.unwrap()
.unwrap();
assert_eq!(
harness
.chain
.find_reorg_slot(&prev_state, prev_block_root)
.unwrap(),
prev_slot
);
}
#[test]
fn chooses_fork() {
let harness = get_harness(VALIDATOR_COUNT);

View File

@ -2153,6 +2153,9 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::FinalizedCheckpoint => {
event_handler.subscribe_finalized()
}
api_types::EventTopic::ChainReorg => {
event_handler.subscribe_reorgs()
}
};
receivers.push(BroadcastStream::new(receiver).map(|msg| {

View File

@ -60,6 +60,7 @@ struct ApiTester {
chain: Arc<BeaconChain<EphemeralHarnessType<E>>>,
client: BeaconNodeHttpClient,
next_block: SignedBeaconBlock<E>,
reorg_block: SignedBeaconBlock<E>,
attestations: Vec<Attestation<E>>,
attester_slashing: AttesterSlashing<E>,
proposer_slashing: ProposerSlashing,
@ -105,6 +106,10 @@ impl ApiTester {
let (next_block, _next_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());
// `make_block` adds random graffiti, so this will produce an alternate block
let (reorg_block, _reorg_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());
let head_state_root = head.beacon_state_root();
let attestations = harness
.get_unaggregated_attestations(
@ -213,6 +218,7 @@ impl ApiTester {
chain,
client,
next_block,
reorg_block,
attestations,
attester_slashing,
proposer_slashing,
@ -238,6 +244,10 @@ impl ApiTester {
let (next_block, _next_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());
// `make_block` adds random graffiti, so this will produce an alternate block
let (reorg_block, _reorg_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());
let head_state_root = head.beacon_state_root();
let attestations = harness
.get_unaggregated_attestations(
@ -320,6 +330,7 @@ impl ApiTester {
chain,
client,
next_block,
reorg_block,
attestations,
attester_slashing,
proposer_slashing,
@ -2233,6 +2244,36 @@ impl ApiTester {
&[expected_block, expected_finalized, expected_head]
);
// Test a reorg event
let mut chain_reorg_event_future = self
.client
.get_events::<E>(&[EventTopic::ChainReorg])
.await
.unwrap();
let expected_reorg = EventKind::ChainReorg(SseChainReorg {
slot: self.next_block.slot(),
depth: 1,
old_head_block: self.next_block.canonical_root(),
old_head_state: self.next_block.state_root(),
new_head_block: self.reorg_block.canonical_root(),
new_head_state: self.reorg_block.state_root(),
epoch: self.next_block.slot().epoch(E::slots_per_epoch()),
});
self.client
.post_beacon_blocks(&self.reorg_block)
.await
.unwrap();
let reorg_event = poll_events(
&mut chain_reorg_event_future,
1,
Duration::from_millis(10000),
)
.await;
assert_eq!(reorg_event.as_slice(), &[expected_reorg]);
self
}

View File

@ -689,6 +689,18 @@ pub struct SseHead {
pub epoch_transition: bool,
}
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct SseChainReorg {
pub slot: Slot,
#[serde(with = "serde_utils::quoted_u64")]
pub depth: u64,
pub old_head_block: Hash256,
pub old_head_state: Hash256,
pub new_head_block: Hash256,
pub new_head_state: Hash256,
pub epoch: Epoch,
}
#[derive(PartialEq, Debug, Serialize, Clone)]
#[serde(bound = "T: EthSpec", untagged)]
pub enum EventKind<T: EthSpec> {
@ -697,6 +709,7 @@ pub enum EventKind<T: EthSpec> {
FinalizedCheckpoint(SseFinalizedCheckpoint),
Head(SseHead),
VoluntaryExit(SignedVoluntaryExit),
ChainReorg(SseChainReorg),
}
impl<T: EthSpec> EventKind<T> {
@ -707,6 +720,7 @@ impl<T: EthSpec> EventKind<T> {
EventKind::Attestation(_) => "attestation",
EventKind::VoluntaryExit(_) => "voluntary_exit",
EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint",
EventKind::ChainReorg(_) => "chain_reorg",
}
}
@ -735,6 +749,9 @@ impl<T: EthSpec> EventKind<T> {
"block" => Ok(EventKind::Block(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Block: {:?}", e)),
)?)),
"chain_reorg" => Ok(EventKind::ChainReorg(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Chain Reorg: {:?}", e)),
)?)),
"finalized_checkpoint" => Ok(EventKind::FinalizedCheckpoint(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!("Finalized Checkpoint: {:?}", e))
@ -768,6 +785,7 @@ pub enum EventTopic {
Attestation,
VoluntaryExit,
FinalizedCheckpoint,
ChainReorg,
}
impl FromStr for EventTopic {
@ -780,6 +798,7 @@ impl FromStr for EventTopic {
"attestation" => Ok(EventTopic::Attestation),
"voluntary_exit" => Ok(EventTopic::VoluntaryExit),
"finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint),
"chain_reorg" => Ok(EventTopic::ChainReorg),
_ => Err("event topic cannot be parsed.".to_string()),
}
}
@ -793,6 +812,7 @@ impl fmt::Display for EventTopic {
EventTopic::Attestation => write!(f, "attestation"),
EventTopic::VoluntaryExit => write!(f, "voluntary_exit"),
EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"),
EventTopic::ChainReorg => write!(f, "chain_reorg"),
}
}
}

View File

@ -55,7 +55,7 @@ fn cache_state<T: EthSpec>(
// Note: increment the state slot here to allow use of our `state_root` and `block_root`
// getter/setter functions.
//
// This is a bit hacky, however it gets the job safely without lots of code.
// This is a bit hacky, however it gets the job done safely without lots of code.
let previous_slot = state.slot;
state.slot.safe_add_assign(1)?;

View File

@ -23,12 +23,14 @@ use tree_hash_derive::TreeHash;
pub use self::committee_cache::CommitteeCache;
pub use clone_config::CloneConfig;
pub use eth_spec::*;
pub use iter::BlockRootsIter;
pub use tree_hash_cache::BeaconTreeHashCache;
#[macro_use]
mod committee_cache;
mod clone_config;
mod exit_cache;
mod iter;
mod pubkey_cache;
mod tests;
mod tree_hash_cache;
@ -640,6 +642,13 @@ impl<T: EthSpec> BeaconState<T> {
}
}
/// Returns an iterator across the past block roots of `state` in descending slot-order.
///
/// See the docs for `BlockRootsIter` for more detail.
pub fn rev_iter_block_roots<'a>(&'a self, spec: &ChainSpec) -> BlockRootsIter<'a, T> {
BlockRootsIter::new(self, spec.genesis_slot)
}
/// Return the block root at a recent `slot`.
///
/// Spec v0.12.1

View File

@ -0,0 +1,151 @@
use crate::*;
/// Returns an iterator across the past block roots of `state` in descending slot-order.
///
/// The iterator has the following characteristics:
///
/// - Will only return *at most* `state.block_roots.len()` entries.
/// - Will not return slots prior to the genesis_slot.
/// - Each call to next will result in a slot one less than the prior one (or `None`).
/// - Skipped slots will contain the block root from the prior non-skipped slot.
pub struct BlockRootsIter<'a, T: EthSpec> {
state: &'a BeaconState<T>,
genesis_slot: Slot,
prev: Slot,
}
impl<'a, T: EthSpec> BlockRootsIter<'a, T> {
/// Instantiates a new iterator, returning roots for slots earlier that `state.slot`.
///
/// See the struct-level documentation for more details.
pub fn new(state: &'a BeaconState<T>, genesis_slot: Slot) -> Self {
Self {
state,
genesis_slot,
prev: state.slot,
}
}
}
impl<'a, T: EthSpec> Iterator for BlockRootsIter<'a, T> {
type Item = Result<(Slot, Hash256), Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.prev > self.genesis_slot
&& self.prev
> self
.state
.slot
.saturating_sub(self.state.block_roots.len() as u64)
{
self.prev = self.prev.saturating_sub(1_u64);
Some(
self.state
.get_block_root(self.prev)
.map(|root| (self.prev, *root)),
)
} else {
None
}
}
}
#[cfg(test)]
mod test {
use crate::*;
type E = MinimalEthSpec;
fn root_slot(i: usize) -> (Slot, Hash256) {
(Slot::from(i), Hash256::from_low_u64_be(i as u64))
}
fn all_roots(state: &BeaconState<E>, spec: &ChainSpec) -> Vec<(Slot, Hash256)> {
state
.rev_iter_block_roots(spec)
.collect::<Result<_, _>>()
.unwrap()
}
#[test]
fn block_roots_iter() {
let spec = E::default_spec();
let mut state: BeaconState<E> = BeaconState::new(0, <_>::default(), &spec);
for i in 0..state.block_roots.len() {
state.block_roots[i] = root_slot(i).1;
}
assert_eq!(
state.slot, spec.genesis_slot,
"test assume a genesis slot state"
);
assert_eq!(
all_roots(&state, &spec),
vec![],
"state at genesis slot has no history"
);
state.slot = Slot::new(1);
assert_eq!(
all_roots(&state, &spec),
vec![root_slot(0)],
"first slot after genesis has one slot history"
);
state.slot = Slot::new(2);
assert_eq!(
all_roots(&state, &spec),
vec![root_slot(1), root_slot(0)],
"second slot after genesis has two slot history"
);
state.slot = Slot::from(state.block_roots.len() + 2);
let expected = (2..state.block_roots.len() + 2)
.rev()
.map(|i| (Slot::from(i), *state.get_block_root(Slot::from(i)).unwrap()))
.collect::<Vec<_>>();
assert_eq!(
all_roots(&state, &spec),
expected,
"slot higher than the block roots history"
);
}
#[test]
fn block_roots_iter_non_zero_genesis() {
let mut spec = E::default_spec();
spec.genesis_slot = Slot::new(4);
let mut state: BeaconState<E> = BeaconState::new(0, <_>::default(), &spec);
for i in 0..state.block_roots.len() {
state.block_roots[i] = root_slot(i).1;
}
assert_eq!(
state.slot, spec.genesis_slot,
"test assume a genesis slot state"
);
assert_eq!(
all_roots(&state, &spec),
vec![],
"state at genesis slot has no history"
);
state.slot = Slot::new(5);
assert_eq!(
all_roots(&state, &spec),
vec![root_slot(4)],
"first slot after genesis has one slot history"
);
state.slot = Slot::new(6);
assert_eq!(
all_roots(&state, &spec),
vec![root_slot(5), root_slot(4)],
"second slot after genesis has two slot history"
);
}
}