Add contribution and proof event (#2527)

## Issue Addressed

N/A

## Proposed Changes

Add the new ContributionAndProof event: https://github.com/ethereum/beacon-APIs/pull/158

## Additional Info

N/A

Co-authored-by: realbigsean <seananderson33@gmail.com>
This commit is contained in:
realbigsean 2021-09-25 07:53:58 +00:00
parent 440badd973
commit 113ef74ef6
6 changed files with 116 additions and 24 deletions

View File

@ -1615,6 +1615,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
metrics::inc_counter(&metrics::SYNC_CONTRIBUTION_PROCESSING_REQUESTS);
let _timer = metrics::start_timer(&metrics::SYNC_CONTRIBUTION_GOSSIP_VERIFICATION_TIMES);
VerifiedSyncContribution::verify(sync_contribution, self).map(|v| {
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_contribution_subscribers() {
event_handler.register(EventKind::ContributionAndProof(Box::new(
v.aggregate().clone(),
)));
}
}
metrics::inc_counter(&metrics::SYNC_CONTRIBUTION_PROCESSING_SUCCESSES);
v
})

View File

@ -12,28 +12,14 @@ pub struct ServerSentEventHandler<T: EthSpec> {
finalized_tx: Sender<EventKind<T>>,
head_tx: Sender<EventKind<T>>,
exit_tx: Sender<EventKind<T>>,
chain_reorg: Sender<EventKind<T>>,
chain_reorg_tx: Sender<EventKind<T>>,
contribution_tx: Sender<EventKind<T>>,
log: Logger,
}
impl<T: EthSpec> ServerSentEventHandler<T> {
pub fn new(log: Logger) -> Self {
let (attestation_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (block_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (finalized_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (head_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (exit_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (chain_reorg, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
Self {
attestation_tx,
block_tx,
finalized_tx,
head_tx,
exit_tx,
chain_reorg,
log,
}
Self::new_with_capacity(log, DEFAULT_CHANNEL_CAPACITY)
}
pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {
@ -42,7 +28,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
let (finalized_tx, _) = broadcast::channel(capacity);
let (head_tx, _) = broadcast::channel(capacity);
let (exit_tx, _) = broadcast::channel(capacity);
let (chain_reorg, _) = broadcast::channel(capacity);
let (chain_reorg_tx, _) = broadcast::channel(capacity);
let (contribution_tx, _) = broadcast::channel(capacity);
Self {
attestation_tx,
@ -50,7 +37,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
finalized_tx,
head_tx,
exit_tx,
chain_reorg,
chain_reorg_tx,
contribution_tx,
log,
}
}
@ -70,8 +58,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
.map(|count| trace!(self.log, "Registering server-sent head event"; "receiver_count" => count)),
EventKind::VoluntaryExit(exit) => self.exit_tx.send(EventKind::VoluntaryExit(exit))
.map(|count| trace!(self.log, "Registering server-sent voluntary exit event"; "receiver_count" => count)),
EventKind::ChainReorg(reorg) => self.chain_reorg.send(EventKind::ChainReorg(reorg))
EventKind::ChainReorg(reorg) => self.chain_reorg_tx.send(EventKind::ChainReorg(reorg))
.map(|count| trace!(self.log, "Registering server-sent chain reorg event"; "receiver_count" => count)),
EventKind::ContributionAndProof(contribution_and_proof) => self.contribution_tx.send(EventKind::ContributionAndProof(contribution_and_proof))
.map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)),
};
if let Err(SendError(event)) = result {
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
@ -99,7 +89,11 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
}
pub fn subscribe_reorgs(&self) -> Receiver<EventKind<T>> {
self.chain_reorg.subscribe()
self.chain_reorg_tx.subscribe()
}
pub fn subscribe_contributions(&self) -> Receiver<EventKind<T>> {
self.contribution_tx.subscribe()
}
pub fn has_attestation_subscribers(&self) -> bool {
@ -123,6 +117,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
}
pub fn has_reorg_subscribers(&self) -> bool {
self.chain_reorg.receiver_count() > 0
self.chain_reorg_tx.receiver_count() > 0
}
pub fn has_contribution_subscribers(&self) -> bool {
self.contribution_tx.receiver_count() > 0
}
}

View File

@ -351,7 +351,7 @@ where
.chain_config(chain_config)
.event_handler(Some(ServerSentEventHandler::new_with_capacity(
log.clone(),
1,
5,
)))
.monitor_validators(true, vec![], log);

View File

@ -2472,6 +2472,9 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::ChainReorg => {
event_handler.subscribe_reorgs()
}
api_types::EventTopic::ContributionAndProof => {
event_handler.subscribe_contributions()
}
};
receivers.push(BroadcastStream::new(receiver).map(|msg| {

View File

@ -1,4 +1,5 @@
use crate::common::{create_api_server, ApiServer};
use beacon_chain::test_utils::RelativeSyncCommittee;
use beacon_chain::{
test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType},
BeaconChain, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
@ -50,6 +51,7 @@ struct ApiTester {
next_block: SignedBeaconBlock<E>,
reorg_block: SignedBeaconBlock<E>,
attestations: Vec<Attestation<E>>,
contribution_and_proofs: Vec<SignedContributionAndProof<E>>,
attester_slashing: AttesterSlashing<E>,
proposer_slashing: ProposerSlashing,
voluntary_exit: SignedVoluntaryExit,
@ -65,10 +67,13 @@ impl ApiTester {
// This allows for testing voluntary exits without building out a massive chain.
let mut spec = E::default_spec();
spec.shard_committee_period = 2;
Self::new_from_spec(spec)
}
pub fn new_from_spec(spec: ChainSpec) -> Self {
let harness = BeaconChainHarness::new(
MainnetEthSpec,
Some(spec),
Some(spec.clone()),
generate_deterministic_keypairs(VALIDATOR_COUNT),
);
@ -122,6 +127,30 @@ impl ApiTester {
"precondition: attestations for testing"
);
let current_epoch = harness
.chain
.slot()
.expect("should get current slot")
.epoch(E::slots_per_epoch());
let is_altair = spec
.altair_fork_epoch
.map(|epoch| epoch <= current_epoch)
.unwrap_or(false);
let contribution_and_proofs = if is_altair {
harness
.make_sync_contributions(
&head.beacon_state,
head_state_root,
harness.chain.slot().unwrap(),
RelativeSyncCommittee::Current,
)
.into_iter()
.filter_map(|(_, contribution)| contribution)
.collect::<Vec<_>>()
} else {
vec![]
};
let attester_slashing = harness.make_attester_slashing(vec![0, 1]);
let proposer_slashing = harness.make_proposer_slashing(2);
let voluntary_exit = harness.make_voluntary_exit(3, harness.chain.epoch().unwrap());
@ -172,6 +201,7 @@ impl ApiTester {
next_block,
reorg_block,
attestations,
contribution_and_proofs,
attester_slashing,
proposer_slashing,
voluntary_exit,
@ -250,6 +280,7 @@ impl ApiTester {
next_block,
reorg_block,
attestations,
contribution_and_proofs: vec![],
attester_slashing,
proposer_slashing,
voluntary_exit,
@ -2325,6 +2356,40 @@ impl ApiTester {
self
}
pub async fn test_get_events_altair(self) -> Self {
let topics = vec![EventTopic::ContributionAndProof];
let mut events_future = self
.client
.get_events::<E>(topics.as_slice())
.await
.unwrap();
let expected_contribution_len = self.contribution_and_proofs.len();
self.client
.post_validator_contribution_and_proofs(self.contribution_and_proofs.as_slice())
.await
.unwrap();
let contribution_events = poll_events(
&mut events_future,
expected_contribution_len,
Duration::from_millis(10000),
)
.await;
assert_eq!(
contribution_events.as_slice(),
self.contribution_and_proofs
.clone()
.into_iter()
.map(|contribution| EventKind::ContributionAndProof(Box::new(contribution)))
.collect::<Vec<_>>()
.as_slice()
);
self
}
pub async fn test_get_events_from_genesis(self) -> Self {
let topics = vec![EventTopic::Block, EventTopic::Head];
let mut events_future = self
@ -2391,6 +2456,15 @@ async fn get_events() {
ApiTester::new().test_get_events().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_events_altair() {
let mut spec = E::default_spec();
spec.altair_fork_epoch = Some(Epoch::new(0));
ApiTester::new_from_spec(spec)
.test_get_events_altair()
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_events_from_genesis() {
ApiTester::new_from_genesis()

View File

@ -770,6 +770,7 @@ pub enum EventKind<T: EthSpec> {
Head(SseHead),
VoluntaryExit(SignedVoluntaryExit),
ChainReorg(SseChainReorg),
ContributionAndProof(Box<SignedContributionAndProof<T>>),
}
impl<T: EthSpec> EventKind<T> {
@ -781,6 +782,7 @@ impl<T: EthSpec> EventKind<T> {
EventKind::VoluntaryExit(_) => "voluntary_exit",
EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint",
EventKind::ChainReorg(_) => "chain_reorg",
EventKind::ContributionAndProof(_) => "contribution_and_proof",
}
}
@ -825,6 +827,11 @@ impl<T: EthSpec> EventKind<T> {
ServerError::InvalidServerSentEvent(format!("Voluntary Exit: {:?}", e))
})?,
)),
"contribution_and_proof" => Ok(EventKind::ContributionAndProof(Box::new(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!("Contribution and Proof: {:?}", e))
})?,
))),
_ => Err(ServerError::InvalidServerSentEvent(
"Could not parse event tag".to_string(),
)),
@ -846,6 +853,7 @@ pub enum EventTopic {
VoluntaryExit,
FinalizedCheckpoint,
ChainReorg,
ContributionAndProof,
}
impl FromStr for EventTopic {
@ -859,6 +867,7 @@ impl FromStr for EventTopic {
"voluntary_exit" => Ok(EventTopic::VoluntaryExit),
"finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint),
"chain_reorg" => Ok(EventTopic::ChainReorg),
"contribution_and_proof" => Ok(EventTopic::ContributionAndProof),
_ => Err("event topic cannot be parsed.".to_string()),
}
}
@ -873,6 +882,7 @@ impl fmt::Display for EventTopic {
EventTopic::VoluntaryExit => write!(f, "voluntary_exit"),
EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"),
EventTopic::ChainReorg => write!(f, "chain_reorg"),
EventTopic::ContributionAndProof => write!(f, "contribution_and_proof"),
}
}
}