lighthouse/beacon_node/beacon_chain/src/events.rs
Michael Sproul 01556f6f01 Optimise payload attributes calculation and add SSE (#4027)
## Issue Addressed

Closes #3896
Closes #3998
Closes #3700

## Proposed Changes

- Optimise the calculation of withdrawals for payload attributes by avoiding state clones, avoiding unnecessary state advances and reading from the snapshot cache if possible.
- Use the execution layer's payload attributes cache to avoid re-calculating payload attributes. I actually implemented a new LRU cache just for withdrawals but it had the exact same key and most of the same data as the existing payload attributes cache, so I deleted it.
- Add a new SSE event that fires when payloadAttributes are calculated. This is useful for block builders, a la https://github.com/ethereum/beacon-APIs/issues/244.
- Add a new CLI flag `--always-prepare-payload` which forces payload attributes to be sent with every fcU regardless of connected proposers. This is intended for use by builders/relays.

For maximum effect, the flags I've been using to run Lighthouse in "payload builder mode" are:

```
--always-prepare-payload \
--prepare-payload-lookahead 12000 \
--suggested-fee-recipient 0x0000000000000000000000000000000000000000
```

The fee recipient is required so Lighthouse has something to pack in the payload attributes (it can be ignored by the builder). The lookahead causes fcU to be sent at the start of every slot rather than at 8s. As usual, fcU will also be sent after each change of head block. I think this combination is sufficient for builders to build on all viable heads. Often there will be two fcU (and two payload attributes) sent for the same slot: one sent at the start of the slot with the head from `n - 1` as the parent, and one sent after the block arrives with `n` as the parent.

Example usage of the new event stream:

```bash
curl -N "http://localhost:5052/eth/v1/events?topics=payload_attributes"
```

## Additional Info

- [x] Tests added by updating the proposer re-org tests. This has the benefit of testing the proposer re-org code paths with withdrawals too, confirming that the new changes don't interact poorly.
- [ ] Benchmarking with `blockdreamer` on devnet-7 showed promising results but I'm yet to do a comparison to `unstable`.


Co-authored-by: Michael Sproul <micsproul@gmail.com>
2023-03-05 23:43:30 +00:00

191 lines
6.1 KiB
Rust

pub use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead};
use slog::{trace, Logger};
use tokio::sync::broadcast;
use tokio::sync::broadcast::{error::SendError, Receiver, Sender};
use types::EthSpec;
const DEFAULT_CHANNEL_CAPACITY: usize = 16;
pub struct ServerSentEventHandler<T: EthSpec> {
attestation_tx: Sender<EventKind<T>>,
block_tx: Sender<EventKind<T>>,
finalized_tx: Sender<EventKind<T>>,
head_tx: Sender<EventKind<T>>,
exit_tx: Sender<EventKind<T>>,
chain_reorg_tx: Sender<EventKind<T>>,
contribution_tx: Sender<EventKind<T>>,
payload_attributes_tx: Sender<EventKind<T>>,
late_head: Sender<EventKind<T>>,
block_reward_tx: Sender<EventKind<T>>,
log: Logger,
}
impl<T: EthSpec> ServerSentEventHandler<T> {
pub fn new(log: Logger) -> Self {
Self::new_with_capacity(log, DEFAULT_CHANNEL_CAPACITY)
}
pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {
let (attestation_tx, _) = broadcast::channel(capacity);
let (block_tx, _) = broadcast::channel(capacity);
let (finalized_tx, _) = broadcast::channel(capacity);
let (head_tx, _) = broadcast::channel(capacity);
let (exit_tx, _) = broadcast::channel(capacity);
let (chain_reorg_tx, _) = broadcast::channel(capacity);
let (contribution_tx, _) = broadcast::channel(capacity);
let (payload_attributes_tx, _) = broadcast::channel(capacity);
let (late_head, _) = broadcast::channel(capacity);
let (block_reward_tx, _) = broadcast::channel(capacity);
Self {
attestation_tx,
block_tx,
finalized_tx,
head_tx,
exit_tx,
chain_reorg_tx,
contribution_tx,
payload_attributes_tx,
late_head,
block_reward_tx,
log,
}
}
pub fn register(&self, kind: EventKind<T>) {
let log_count = |name, count| {
trace!(
self.log,
"Registering server-sent event";
"kind" => name,
"receiver_count" => count
);
};
let result = match &kind {
EventKind::Attestation(_) => self
.attestation_tx
.send(kind)
.map(|count| log_count(count, "attestation")),
EventKind::Block(_) => self
.block_tx
.send(kind)
.map(|count| log_count(count, "block")),
EventKind::FinalizedCheckpoint(_) => self
.finalized_tx
.send(kind)
.map(|count| log_count(count, "finalized checkpoint")),
EventKind::Head(_) => self
.head_tx
.send(kind)
.map(|count| log_count(count, "head")),
EventKind::VoluntaryExit(_) => self
.exit_tx
.send(kind)
.map(|count| log_count(count, "exit")),
EventKind::ChainReorg(_) => self
.chain_reorg_tx
.send(kind)
.map(|count| log_count(count, "chain reorg")),
EventKind::ContributionAndProof(_) => self
.contribution_tx
.send(kind)
.map(|count| log_count(count, "contribution and proof")),
EventKind::PayloadAttributes(_) => self
.payload_attributes_tx
.send(kind)
.map(|count| log_count(count, "payload attributes")),
EventKind::LateHead(_) => self
.late_head
.send(kind)
.map(|count| log_count(count, "late head")),
EventKind::BlockReward(_) => self
.block_reward_tx
.send(kind)
.map(|count| log_count(count, "block reward")),
};
if let Err(SendError(event)) = result {
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
}
}
pub fn subscribe_attestation(&self) -> Receiver<EventKind<T>> {
self.attestation_tx.subscribe()
}
pub fn subscribe_block(&self) -> Receiver<EventKind<T>> {
self.block_tx.subscribe()
}
pub fn subscribe_finalized(&self) -> Receiver<EventKind<T>> {
self.finalized_tx.subscribe()
}
pub fn subscribe_head(&self) -> Receiver<EventKind<T>> {
self.head_tx.subscribe()
}
pub fn subscribe_exit(&self) -> Receiver<EventKind<T>> {
self.exit_tx.subscribe()
}
pub fn subscribe_reorgs(&self) -> Receiver<EventKind<T>> {
self.chain_reorg_tx.subscribe()
}
pub fn subscribe_contributions(&self) -> Receiver<EventKind<T>> {
self.contribution_tx.subscribe()
}
pub fn subscribe_payload_attributes(&self) -> Receiver<EventKind<T>> {
self.payload_attributes_tx.subscribe()
}
pub fn subscribe_late_head(&self) -> Receiver<EventKind<T>> {
self.late_head.subscribe()
}
pub fn subscribe_block_reward(&self) -> Receiver<EventKind<T>> {
self.block_reward_tx.subscribe()
}
pub fn has_attestation_subscribers(&self) -> bool {
self.attestation_tx.receiver_count() > 0
}
pub fn has_block_subscribers(&self) -> bool {
self.block_tx.receiver_count() > 0
}
pub fn has_finalized_subscribers(&self) -> bool {
self.finalized_tx.receiver_count() > 0
}
pub fn has_head_subscribers(&self) -> bool {
self.head_tx.receiver_count() > 0
}
pub fn has_exit_subscribers(&self) -> bool {
self.exit_tx.receiver_count() > 0
}
pub fn has_reorg_subscribers(&self) -> bool {
self.chain_reorg_tx.receiver_count() > 0
}
pub fn has_contribution_subscribers(&self) -> bool {
self.contribution_tx.receiver_count() > 0
}
pub fn has_payload_attributes_subscribers(&self) -> bool {
self.payload_attributes_tx.receiver_count() > 0
}
pub fn has_late_head_subscribers(&self) -> bool {
self.late_head.receiver_count() > 0
}
pub fn has_block_reward_subscribers(&self) -> bool {
self.block_reward_tx.receiver_count() > 0
}
}