finish refactor

This commit is contained in:
realbigsean 2023-01-21 04:48:25 -05:00
parent eb9feed784
commit cbd09dc281
17 changed files with 92 additions and 61 deletions

View File

@ -9,12 +9,12 @@ use crate::BlockError::BlobValidation;
use crate::{kzg_utils, BeaconChainError, BlockError};
use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions;
use types::signed_beacon_block::BlobReconstructionError;
use types::ExecPayload;
use types::{
BeaconBlockRef, BeaconStateError, BlobsSidecar, Epoch, EthSpec, Hash256, KzgCommitment,
SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockHeader, Slot,
Transactions,
};
use types::ExecPayload;
#[derive(Debug)]
pub enum BlobError {
@ -89,7 +89,7 @@ pub fn validate_blob_for_gossip<T: BeaconChainTypes>(
block_root: Hash256,
chain: &BeaconChain<T>,
) -> Result<AvailableBlock<T::EthSpec>, BlobError> {
if let BlockWrapper::BlockAndBlob(block, blobs_sidecar) = block_wrapper {
if let BlockWrapper::BlockAndBlob(ref block, ref blobs_sidecar) = block_wrapper {
let blob_slot = blobs_sidecar.beacon_block_slot;
// Do not gossip or process blobs from future or past slots.
let latest_permissible_slot = chain
@ -178,6 +178,16 @@ impl<E: EthSpec> From<SignedBeaconBlock<E>> for BlockWrapper<E> {
}
}
impl<E: EthSpec> From<SignedBeaconBlockAndBlobsSidecar<E>> for BlockWrapper<E> {
fn from(block: SignedBeaconBlockAndBlobsSidecar<E>) -> Self {
let SignedBeaconBlockAndBlobsSidecar {
beacon_block,
blobs_sidecar,
} = block;
BlockWrapper::BlockAndBlob(beacon_block, blobs_sidecar)
}
}
impl<E: EthSpec> From<Arc<SignedBeaconBlock<E>>> for BlockWrapper<E> {
fn from(block: Arc<SignedBeaconBlock<E>>) -> Self {
BlockWrapper::Block(block)
@ -327,15 +337,6 @@ impl<E: EthSpec> AvailableBlock<E> {
}
}
pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self.0 {
AvailableBlockInner::Block(block) => block.clone(),
AvailableBlockInner::BlockAndBlob(block_sidecar_pair) => {
block_sidecar_pair.beacon_block.clone()
}
}
}
pub fn blobs(&self) -> Option<Arc<BlobsSidecar<E>>> {
match &self.0 {
AvailableBlockInner::Block(_) => None,
@ -385,8 +386,9 @@ pub trait AsBlock<E: EthSpec> {
fn parent_root(&self) -> Hash256;
fn state_root(&self) -> Hash256;
fn signed_block_header(&self) -> SignedBeaconBlockHeader;
fn as_block(&self) -> &SignedBeaconBlock<E>;
fn message(&self) -> BeaconBlockRef<E>;
fn as_block(&self) -> &SignedBeaconBlock<E>;
fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>>;
}
impl<E: EthSpec> AsBlock<E> for BlockWrapper<E> {
@ -417,9 +419,7 @@ impl<E: EthSpec> AsBlock<E> for BlockWrapper<E> {
fn message(&self) -> BeaconBlockRef<E> {
match &self {
BlockWrapper::Block(block) => block.message(),
BlockWrapper::BlockAndBlob(block, _) => {
block.message()
}
BlockWrapper::BlockAndBlob(block, _) => block.message(),
}
}
fn as_block(&self) -> &SignedBeaconBlock<E> {
@ -428,6 +428,12 @@ impl<E: EthSpec> AsBlock<E> for BlockWrapper<E> {
BlockWrapper::BlockAndBlob(block, _) => &block,
}
}
fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self {
BlockWrapper::Block(block) => block.clone(),
BlockWrapper::BlockAndBlob(block, _) => block.clone(),
}
}
}
impl<E: EthSpec> AsBlock<E> for &BlockWrapper<E> {
@ -458,9 +464,7 @@ impl<E: EthSpec> AsBlock<E> for &BlockWrapper<E> {
fn message(&self) -> BeaconBlockRef<E> {
match &self {
BlockWrapper::Block(block) => block.message(),
BlockWrapper::BlockAndBlob(block, _) => {
block.message()
}
BlockWrapper::BlockAndBlob(block, _) => block.message(),
}
}
fn as_block(&self) -> &SignedBeaconBlock<E> {
@ -469,6 +473,12 @@ impl<E: EthSpec> AsBlock<E> for &BlockWrapper<E> {
BlockWrapper::BlockAndBlob(block, _) => &block,
}
}
fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self {
BlockWrapper::Block(block) => block.clone(),
BlockWrapper::BlockAndBlob(block, _) => block.clone(),
}
}
}
impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
@ -520,4 +530,12 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
}
}
}
fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self.0 {
AvailableBlockInner::Block(block) => block.clone(),
AvailableBlockInner::BlockAndBlob(block_sidecar_pair) => {
block_sidecar_pair.beacon_block.clone()
}
}
}
}

View File

@ -73,6 +73,7 @@ use fork_choice::{AttestationFromBlock, PayloadVerificationStatus};
use parking_lot::RwLockReadGuard;
use proto_array::{Block as ProtoBlock, Block};
use safe_arith::ArithError;
use slasher::test_utils::{block, E};
use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
@ -90,7 +91,6 @@ use std::fs;
use std::io::Write;
use std::sync::Arc;
use std::time::Duration;
use slasher::test_utils::{block, E};
use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
use task_executor::JoinHandle;
use tree_hash::TreeHash;
@ -314,7 +314,7 @@ pub enum BlockError<T: EthSpec> {
BlobValidation(BlobError),
}
impl <T: EthSpec>From<BlobError> for BlockError<T> {
impl<T: EthSpec> From<BlobError> for BlockError<T> {
fn from(e: BlobError) -> Self {
Self::BlobValidation(e)
}
@ -601,10 +601,10 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
let mut consensus_context =
ConsensusContext::new(block.slot()).set_current_block_root(*block_root);
//FIXME(sean) batch kzg verification
let available_block = block.into_available_block(*block_root, chain)?;
signature_verifier.include_all_signatures(block.as_block(), &mut consensus_context)?;
signature_verifier.include_all_signatures(available_block.as_block(), &mut consensus_context)?;
//FIXME(sean) batch kzg verification
let available_block = block.clone().into_available_block(*block_root, chain)?;
// Save the block and its consensus context. The context will have had its proposer index
// and attesting indices filled in, which can be used to accelerate later block processing.

View File

@ -1,4 +1,5 @@
use crate::metrics;
use beacon_chain::blob_verification::{AsBlock, AvailableBlock, BlockWrapper, IntoAvailableBlock};
use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now};
use beacon_chain::NotifyExecutionLayer;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized};
@ -9,7 +10,6 @@ use slot_clock::SlotClock;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tree_hash::TreeHash;
use types::signed_block_and_blobs::AvailableBlock;
use types::{
AbstractExecPayload, BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash, FullPayload,
Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar,
@ -32,7 +32,7 @@ pub async fn publish_block<T: BeaconChainTypes>(
// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour.
let wrapped_block: AvailableBlock<T::EthSpec> =
let wrapped_block: BlockWrapper<T::EthSpec> =
if matches!(block.as_ref(), &SignedBeaconBlock::Eip4844(_)) {
if let Some(sidecar) = chain.blob_cache.pop(&block_root) {
let block_and_blobs = SignedBeaconBlockAndBlobsSidecar {
@ -56,14 +56,19 @@ pub async fn publish_block<T: BeaconChainTypes>(
};
// Determine the delay after the start of the slot, register it with metrics.
let block = wrapped_block.block();
let block = wrapped_block.as_block();
let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock);
metrics::observe_duration(&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, delay);
//FIXME(sean) handle errors
let available_block = wrapped_block
.into_available_block(block_root, &chain)
.unwrap();
match chain
.process_block(
block_root,
wrapped_block.clone(),
available_block.clone(),
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
@ -75,14 +80,14 @@ pub async fn publish_block<T: BeaconChainTypes>(
"Valid block from HTTP API";
"block_delay" => ?delay,
"root" => format!("{}", root),
"proposer_index" => block.message().proposer_index(),
"slot" => block.slot(),
"proposer_index" => available_block.message().proposer_index(),
"slot" => available_block.slot(),
);
// Notify the validator monitor.
chain.validator_monitor.read().register_api_block(
seen_timestamp,
block.message(),
available_block.message(),
root,
&chain.slot_clock,
);
@ -104,7 +109,7 @@ pub async fn publish_block<T: BeaconChainTypes>(
"Block was broadcast too late";
"msg" => "system may be overloaded, block likely to be orphaned",
"delay_ms" => delay.as_millis(),
"slot" => block.slot(),
"slot" => available_block.slot(),
"root" => ?root,
)
} else if delay >= delayed_threshold {
@ -113,7 +118,7 @@ pub async fn publish_block<T: BeaconChainTypes>(
"Block broadcast was delayed";
"msg" => "system may be overloaded, block may be orphaned",
"delay_ms" => delay.as_millis(),
"slot" => block.slot(),
"slot" => available_block.slot(),
"root" => ?root,
)
}
@ -124,8 +129,8 @@ pub async fn publish_block<T: BeaconChainTypes>(
info!(
log,
"Block from HTTP API already known";
"block" => ?block.canonical_root(),
"slot" => block.slot(),
"block" => ?block_root,
"slot" => available_block.slot(),
);
Ok(())
}

View File

@ -40,6 +40,7 @@
use crate::sync::manager::BlockProcessType;
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::parking_lot::Mutex;
use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock, NotifyExecutionLayer};
use derivative::Derivative;
@ -62,7 +63,6 @@ use std::time::Duration;
use std::{cmp, collections::HashSet};
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use types::signed_block_and_blobs::BlockWrapper;
use types::{
Attestation, AttesterSlashing, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate,
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar,

View File

@ -13,6 +13,7 @@
use super::MAX_SCHEDULED_WORK_QUEUE_LEN;
use crate::metrics;
use crate::sync::manager::BlockProcessType;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
use fnv::FnvHashMap;
use futures::task::Poll;
@ -29,7 +30,6 @@ use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time::error::Error as TimeError;
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
use types::signed_block_and_blobs::BlockWrapper;
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SubnetId};
const TASK_NAME: &str = "beacon_processor_reprocess_queue";

View File

@ -1,5 +1,6 @@
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::store::Error;
use beacon_chain::{
attestation_verification::{self, Error as AttnError, VerifiedAttestation},
@ -18,7 +19,6 @@ use ssz::Encode;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError;
use tokio::sync::mpsc;
use types::signed_block_and_blobs::BlockWrapper;
use types::{
Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, LightClientFinalityUpdate,
LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof,
@ -726,7 +726,7 @@ impl<T: BeaconChainTypes> Worker<T> {
let block_root = if let Ok(verified_block) = &verification_result {
verified_block.block_root
} else {
block.block().canonical_root()
block.as_block().canonical_root()
};
// Write the time the block was observed into delay cache.

View File

@ -7,6 +7,7 @@ use crate::beacon_processor::DuplicateCache;
use crate::metrics;
use crate::sync::manager::{BlockProcessType, SyncMessage};
use crate::sync::{BatchProcessResult, ChainId};
use beacon_chain::blob_verification::{AsBlock, BlockWrapper, IntoAvailableBlock};
use beacon_chain::CountUnrealized;
use beacon_chain::{
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError,
@ -16,7 +17,6 @@ use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::signed_block_and_blobs::BlockWrapper;
use types::{Epoch, Hash256, SignedBeaconBlock};
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
@ -85,15 +85,23 @@ impl<T: BeaconChainTypes> Worker<T> {
}
};
let slot = block.slot();
let result = self
.chain
.process_block(
block_root,
block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await;
let available_block = block
.into_available_block(block_root, &self.chain)
.map_err(BlockError::BlobValidation);
let result = match available_block {
Ok(block) => {
self.chain
.process_block(
block_root,
block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
}
Err(e) => Err(e),
};
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);

View File

@ -14,6 +14,7 @@ use crate::sync::network_context::SyncNetworkContext;
use crate::sync::range_sync::{
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
};
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::types::{BackFillState, NetworkGlobals};
use lighthouse_network::{PeerAction, PeerId};
@ -24,7 +25,6 @@ use std::collections::{
HashMap, HashSet,
};
use std::sync::Arc;
use types::signed_block_and_blobs::BlockWrapper;
use types::{Epoch, EthSpec};
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of

View File

@ -2,6 +2,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::time::Duration;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::{BeaconChainTypes, BlockError};
use fnv::FnvHashMap;
use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode};
@ -10,7 +11,6 @@ use lru_cache::LRUTimeCache;
use slog::{debug, error, trace, warn, Logger};
use smallvec::SmallVec;
use store::Hash256;
use types::signed_block_and_blobs::BlockWrapper;
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
use crate::metrics;

View File

@ -1,9 +1,9 @@
use super::RootBlockTuple;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::BeaconChainTypes;
use lighthouse_network::PeerId;
use store::Hash256;
use strum::IntoStaticStr;
use types::signed_block_and_blobs::BlockWrapper;
use crate::sync::block_lookups::ForceBlockRequest;
use crate::sync::{

View File

@ -1,4 +1,5 @@
use super::RootBlockTuple;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::get_block_root;
use lighthouse_network::{rpc::BlocksByRootRequest, PeerId};
use rand::seq::IteratorRandom;
@ -6,7 +7,6 @@ use ssz_types::VariableList;
use std::collections::HashSet;
use store::{EthSpec, Hash256};
use strum::IntoStaticStr;
use types::signed_block_and_blobs::BlockWrapper;
/// Object representing a single block lookup request.
#[derive(PartialEq, Eq)]
@ -115,7 +115,7 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
Some(block) => {
// Compute the block root using this specific function so that we can get timing
// metrics.
let block_root = get_block_root(block.block());
let block_root = get_block_root(block.as_block());
if block_root != self.hash {
// return an error and drop the block
// NOTE: we take this is as a download failure to prevent counting the
@ -205,7 +205,7 @@ impl<const MAX_ATTEMPTS: u8> slog::Value for SingleBlockRequest<MAX_ATTEMPTS> {
mod tests {
use super::*;
use types::test_utils::{SeedableRng, TestRandom, XorShiftRng};
use types::MinimalEthSpec as E;
use types::{MinimalEthSpec as E, SignedBeaconBlock};
fn rand_block() -> SignedBeaconBlock<E> {
let mut rng = XorShiftRng::from_seed([42; 16]);

View File

@ -1,7 +1,7 @@
use beacon_chain::blob_verification::BlockWrapper;
use std::{collections::VecDeque, sync::Arc};
use types::signed_block_and_blobs::BlockWrapper;
use types::{signed_block_and_blobs::AvailableBlock, BlobsSidecar, EthSpec, SignedBeaconBlock};
use types::{BlobsSidecar, EthSpec, SignedBeaconBlock};
#[derive(Debug, Default)]
pub struct BlocksAndBlobsRequestInfo<T: EthSpec> {

View File

@ -42,6 +42,7 @@ use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEven
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::range_sync::ByRangeRequestType;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
use futures::StreamExt;
use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
@ -55,7 +56,6 @@ use std::ops::Sub;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::signed_block_and_blobs::BlockWrapper;
use types::{
BlobsSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot,
};

View File

@ -8,6 +8,7 @@ use crate::beacon_processor::WorkEvent;
use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::ForceBlockRequest;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
@ -17,7 +18,6 @@ use slog::{debug, trace, warn};
use std::collections::hash_map::Entry;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::signed_block_and_blobs::BlockWrapper;
use types::{BlobsSidecar, EthSpec, SignedBeaconBlock};
/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id.

View File

@ -1,11 +1,11 @@
use crate::sync::manager::Id;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use lighthouse_network::rpc::methods::BlocksByRangeRequest;
use lighthouse_network::PeerId;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::ops::Sub;
use strum::Display;
use types::signed_block_and_blobs::BlockWrapper;
use types::{Epoch, EthSpec, Slot};
/// The number of times to retry a batch before it is considered failed.

View File

@ -3,6 +3,7 @@ use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEven
use crate::sync::{
manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult,
};
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::{BeaconChainTypes, CountUnrealized};
use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId};
@ -10,7 +11,6 @@ use rand::seq::SliceRandom;
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use std::hash::{Hash, Hasher};
use types::signed_block_and_blobs::BlockWrapper;
use types::{Epoch, EthSpec, Hash256, Slot};
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of

View File

@ -47,6 +47,7 @@ use crate::status::ToStatusMessage;
use crate::sync::manager::Id;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::BatchProcessResult;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::rpc::GoodbyeReason;
use lighthouse_network::PeerId;
@ -55,7 +56,6 @@ use lru_cache::LRUTimeCache;
use slog::{crit, debug, trace, warn};
use std::collections::HashMap;
use std::sync::Arc;
use types::signed_block_and_blobs::BlockWrapper;
use types::{Epoch, EthSpec, Hash256, Slot};
/// For how long we store failed finalized chains to prevent retries.