lighthouse/beacon_node/http_api/src/publish_blocks.rs

252 lines
9.6 KiB
Rust
Raw Normal View History

use crate::metrics;
2023-01-21 09:52:36 +00:00
use beacon_chain::blob_verification::{AsBlock, BlockWrapper, IntoAvailableBlock};
use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now};
2022-12-01 16:13:07 +00:00
use beacon_chain::NotifyExecutionLayer;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized};
2022-11-21 19:53:33 +00:00
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tree_hash::TreeHash;
2023-03-15 04:57:30 +00:00
use types::{
AbstractExecPayload, BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash, FullPayload,
Hash256, SignedBeaconBlock, SignedBlockContents,
};
use warp::Rejection;
/// Handles a request from the HTTP API for full blocks.
pub async fn publish_block<T: BeaconChainTypes>(
block_root: Option<Hash256>,
block_contents: SignedBlockContents<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger,
) -> Result<(), Rejection> {
let seen_timestamp = timestamp_now();
2023-03-15 04:57:30 +00:00
let (block, maybe_blobs) = block_contents.deconstruct();
let block = Arc::new(block);
//FIXME(sean) have to move this to prior to publishing because it's included in the blobs sidecar message.
//this may skew metrics
let block_root = block_root.unwrap_or_else(|| block.canonical_root());
debug!(
log,
"Signed block published to HTTP API";
"slot" => block.slot()
);
// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour.
2023-03-15 04:57:30 +00:00
let wrapped_block: BlockWrapper<T::EthSpec> = match block.as_ref() {
SignedBeaconBlock::Base(_)
| SignedBeaconBlock::Altair(_)
| SignedBeaconBlock::Merge(_)
| SignedBeaconBlock::Capella(_) => {
crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?;
block.into()
2023-03-15 04:57:30 +00:00
}
SignedBeaconBlock::Eip4844(_) => {
crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?;
if let Some(blobs) = maybe_blobs {
for (blob_index, blob) in blobs.into_iter().enumerate() {
crate::publish_pubsub_message(
network_tx,
PubsubMessage::BlobSidecar(Box::new((blob_index as u64, blob))),
)?;
}
}
block.into()
}
};
// Determine the delay after the start of the slot, register it with metrics.
2023-01-21 09:48:25 +00:00
let block = wrapped_block.as_block();
let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock);
metrics::observe_duration(&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, delay);
2023-01-22 04:54:25 +00:00
let available_block = match wrapped_block.into_available_block(block_root, &chain) {
Ok(available_block) => available_block,
Err(e) => {
let msg = format!("{:?}", e);
error!(
log,
"Invalid block provided to HTTP API";
"reason" => &msg
);
return Err(warp_utils::reject::broadcast_without_import(msg));
}
};
2023-01-21 09:48:25 +00:00
match chain
.process_block(
block_root,
2023-01-21 09:48:25 +00:00
available_block.clone(),
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
{
Ok(root) => {
info!(
log,
"Valid block from HTTP API";
"block_delay" => ?delay,
"root" => format!("{}", root),
2023-01-21 09:48:25 +00:00
"proposer_index" => available_block.message().proposer_index(),
"slot" => available_block.slot(),
);
// Notify the validator monitor.
chain.validator_monitor.read().register_api_block(
seen_timestamp,
2023-01-21 09:48:25 +00:00
available_block.message(),
root,
&chain.slot_clock,
);
// Update the head since it's likely this block will become the new
// head.
chain.recompute_head_at_current_slot().await;
// Perform some logging to inform users if their blocks are being produced
// late.
//
// Check to see the thresholds are non-zero to avoid logging errors with small
// slot times (e.g., during testing)
let too_late_threshold = chain.slot_clock.unagg_attestation_production_delay();
let delayed_threshold = too_late_threshold / 2;
if delay >= too_late_threshold {
error!(
log,
"Block was broadcast too late";
"msg" => "system may be overloaded, block likely to be orphaned",
"delay_ms" => delay.as_millis(),
2023-01-21 09:48:25 +00:00
"slot" => available_block.slot(),
"root" => ?root,
)
} else if delay >= delayed_threshold {
error!(
log,
"Block broadcast was delayed";
"msg" => "system may be overloaded, block may be orphaned",
"delay_ms" => delay.as_millis(),
2023-01-21 09:48:25 +00:00
"slot" => available_block.slot(),
"root" => ?root,
)
}
Ok(())
}
Err(BlockError::BlockIsAlreadyKnown) => {
info!(
log,
"Block from HTTP API already known";
2023-01-21 09:48:25 +00:00
"block" => ?block_root,
"slot" => available_block.slot(),
);
Ok(())
}
Err(BlockError::RepeatProposal { proposer, slot }) => {
warn!(
log,
"Block ignored due to repeat proposal";
"msg" => "this can happen when a VC uses fallback BNs. \
whilst this is not necessarily an error, it can indicate issues with a BN \
or between the VC and BN.",
"slot" => slot,
"proposer" => proposer,
);
Ok(())
}
Err(e) => {
let msg = format!("{:?}", e);
error!(
log,
"Invalid block provided to HTTP API";
"reason" => &msg
);
Err(warp_utils::reject::broadcast_without_import(msg))
}
}
}
/// Handles a request from the HTTP API for blinded blocks. This converts blinded blocks into full
/// blocks before publishing.
pub async fn publish_blinded_block<T: BeaconChainTypes>(
block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>,
chain: Arc<BeaconChain<T>>,
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger,
) -> Result<(), Rejection> {
let block_root = block.canonical_root();
let full_block = reconstruct_block(chain.clone(), block_root, block, log.clone()).await?;
publish_block::<T>(
Some(block_root),
SignedBlockContents::Block(full_block),
chain,
network_tx,
log,
)
.await
}
/// Deconstruct the given blinded block, and construct a full block. This attempts to use the
/// execution layer's payload cache, and if that misses, attempts a blind block proposal to retrieve
/// the full payload.
async fn reconstruct_block<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_root: Hash256,
block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>,
log: Logger,
) -> Result<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>, Rejection> {
let full_payload = if let Ok(payload_header) = block.message().body().execution_payload() {
let el = chain.execution_layer.as_ref().ok_or_else(|| {
warp_utils::reject::custom_server_error("Missing execution layer".to_string())
})?;
// If the execution block hash is zero, use an empty payload.
let full_payload = if payload_header.block_hash() == ExecutionBlockHash::zero() {
FullPayload::default_at_fork(
chain
.spec
.fork_name_at_epoch(block.slot().epoch(T::EthSpec::slots_per_epoch())),
)
2022-12-30 16:00:14 +00:00
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"Default payload construction error: {e:?}"
))
})?
.into()
// If we already have an execution payload with this transactions root cached, use it.
} else if let Some(cached_payload) =
el.get_payload_by_root(&payload_header.tree_hash_root())
{
info!(log, "Reconstructing a full block using a local payload"; "block_hash" => ?cached_payload.block_hash());
cached_payload
// Otherwise, this means we are attempting a blind block proposal.
} else {
let full_payload = el
.propose_blinded_beacon_block(block_root, &block)
.await
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"Blind block proposal failed: {:?}",
e
))
})?;
info!(log, "Successfully published a block to the builder network"; "block_hash" => ?full_payload.block_hash());
full_payload
};
Some(full_payload)
} else {
None
};
block.try_into_full_block(full_payload).ok_or_else(|| {
warp_utils::reject::custom_server_error("Unable to add payload to block".to_string())
})
}