Merge pull request #3912 from realbigsean/sync-error-handling

Sync error handling
This commit is contained in:
realbigsean 2023-01-25 11:46:59 +01:00 committed by GitHub
commit 32b0fb13d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 358 additions and 95 deletions

View File

@ -956,23 +956,35 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn get_block_and_blobs_checking_early_attester_cache(
&self,
block_root: &Hash256,
) -> Result<
(
Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
Option<Arc<BlobsSidecar<T::EthSpec>>>,
),
Error,
> {
if let (Some(block), Some(blobs)) = (
self.early_attester_cache.get_block(*block_root),
self.early_attester_cache.get_blobs(*block_root),
) {
return Ok((Some(block), Some(blobs)));
) -> Result<Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>, Error> {
// If there is no data availability boundary, the Eip4844 fork is disabled.
if let Some(finalized_data_availability_boundary) =
self.finalized_data_availability_boundary()
{
// Only use the attester cache if we can find both the block and blob
if let (Some(block), Some(blobs)) = (
self.early_attester_cache.get_block(*block_root),
self.early_attester_cache.get_blobs(*block_root),
) {
Ok(Some(SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
}))
// Attempt to get the block and blobs from the database
} else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) {
let blobs = self
.get_blobs(block_root, finalized_data_availability_boundary)?
.map(Arc::new);
Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
}))
} else {
Ok(None)
}
} else {
Ok(None)
}
Ok((
self.get_block(block_root).await?.map(Arc::new),
self.get_blobs(block_root).ok().flatten().map(Arc::new),
))
}
/// Returns the block at the given root, if any.
@ -1044,33 +1056,46 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Returns the blobs at the given root, if any.
///
/// ## Errors
/// Returns `Ok(None)` if the blobs and associated block are not found.
///
/// May return a database error.
/// If we can find the corresponding block in our database, we know whether we *should* have
/// blobs. If we should have blobs and no blobs are found, this will error. If we shouldn't,
/// this will reconstruct an empty `BlobsSidecar`.
///
/// ## Errors
/// - any database read errors
/// - block and blobs are inconsistent in the database
/// - this method is called with a pre-eip4844 block root
/// - this method is called for a blob that is beyond the prune depth
pub fn get_blobs(
&self,
block_root: &Hash256,
data_availability_boundary: Epoch,
) -> Result<Option<BlobsSidecar<T::EthSpec>>, Error> {
match self.store.get_blobs(block_root)? {
Some(blobs) => Ok(Some(blobs)),
None => {
if let Ok(Some(block)) = self.get_blinded_block(block_root) {
let expected_kzg_commitments = block.message().body().blob_kzg_commitments()?;
if expected_kzg_commitments.len() > 0 {
Err(Error::DBInconsistent(format!(
"Expected kzg commitments but no blobs stored for block root {}",
block_root
)))
} else {
Ok(Some(BlobsSidecar::empty_from_parts(
*block_root,
block.slot(),
)))
}
} else {
Ok(None)
}
// Check for the corresponding block to understand whether we *should* have blobs.
self.get_blinded_block(block_root)?
.map(|block| {
// If there are no KZG commitments in the block, we know the sidecar should
// be empty.
let expected_kzg_commitments =
match block.message().body().blob_kzg_commitments() {
Ok(kzg_commitments) => kzg_commitments,
Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock),
};
if expected_kzg_commitments.is_empty() {
Ok(BlobsSidecar::empty_from_parts(*block_root, block.slot()))
} else if data_availability_boundary <= block.epoch() {
// We should have blobs for all blocks younger than the boundary.
Err(Error::BlobsUnavailable)
} else {
// We shouldn't have blobs for blocks older than the boundary.
Err(Error::BlobsOlderThanDataAvailabilityBoundary(block.epoch()))
}
})
.transpose()
}
}
}
@ -2486,7 +2511,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
while let Some((_root, block)) = filtered_chain_segment.first() {
// Determine the epoch of the first block in the remaining segment.
let start_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
let start_epoch = block.epoch();
// The `last_index` indicates the position of the first block in an epoch greater
// than the current epoch: partitioning the blocks into a run of blocks in the same
@ -2494,9 +2519,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// the same `BeaconState`.
let last_index = filtered_chain_segment
.iter()
.position(|(_root, block)| {
block.slot().epoch(T::EthSpec::slots_per_epoch()) > start_epoch
})
.position(|(_root, block)| block.epoch() > start_epoch)
.unwrap_or(filtered_chain_segment.len());
let mut blocks = filtered_chain_segment.split_off(last_index);
@ -3162,7 +3185,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Sync aggregate.
if let Ok(sync_aggregate) = block.body().sync_aggregate() {
// `SyncCommittee` for the sync_aggregate should correspond to the duty slot
let duty_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
let duty_epoch = block.epoch();
match self.sync_committee_at_epoch(duty_epoch) {
Ok(sync_committee) => {
@ -3429,7 +3452,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block_slot: Slot,
) {
// Do not write to eth1 finalization cache for blocks older than 5 epochs.
if block.slot().epoch(T::EthSpec::slots_per_epoch()) + 5 < current_epoch {
if block.epoch() + 5 < current_epoch {
return;
}
@ -5856,6 +5879,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.flatten()
}
/// The epoch that is a data availability boundary, or the latest finalized epoch.
/// `None` if the `Eip4844` fork is disabled.
pub fn finalized_data_availability_boundary(&self) -> Option<Epoch> {
self.data_availability_boundary().map(|boundary| {
std::cmp::max(
boundary,
self.canonical_head
.cached_head()
.finalized_checkpoint()
.epoch,
)
})
}
/// Returns `true` if we are at or past the `Eip4844` fork. This will always return `false` if
/// the `Eip4844` fork is disabled.
pub fn is_data_availability_check_required(&self) -> Result<bool, Error> {

View File

@ -6,12 +6,12 @@ use crate::beacon_chain::{BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DI
use crate::{kzg_utils, BeaconChainError};
use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions;
use types::signed_beacon_block::BlobReconstructionError;
use types::ExecPayload;
use types::{
BeaconBlockRef, BeaconStateError, BlobsSidecar, EthSpec, Hash256, KzgCommitment,
SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockHeader, Slot,
Transactions,
};
use types::{Epoch, ExecPayload};
#[derive(Debug)]
pub enum BlobError {
@ -384,6 +384,7 @@ impl<E: EthSpec> IntoBlockWrapper<E> for AvailableBlock<E> {
pub trait AsBlock<E: EthSpec> {
fn slot(&self) -> Slot;
fn epoch(&self) -> Epoch;
fn parent_root(&self) -> Hash256;
fn state_root(&self) -> Hash256;
fn signed_block_header(&self) -> SignedBeaconBlockHeader;
@ -399,6 +400,12 @@ impl<E: EthSpec> AsBlock<E> for BlockWrapper<E> {
BlockWrapper::BlockAndBlob(block, _) => block.slot(),
}
}
fn epoch(&self) -> Epoch {
match self {
BlockWrapper::Block(block) => block.epoch(),
BlockWrapper::BlockAndBlob(block, _) => block.epoch(),
}
}
fn parent_root(&self) -> Hash256 {
match self {
BlockWrapper::Block(block) => block.parent_root(),
@ -444,6 +451,12 @@ impl<E: EthSpec> AsBlock<E> for &BlockWrapper<E> {
BlockWrapper::BlockAndBlob(block, _) => block.slot(),
}
}
fn epoch(&self) -> Epoch {
match self {
BlockWrapper::Block(block) => block.epoch(),
BlockWrapper::BlockAndBlob(block, _) => block.epoch(),
}
}
fn parent_root(&self) -> Hash256 {
match self {
BlockWrapper::Block(block) => block.parent_root(),
@ -491,6 +504,14 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
}
}
}
fn epoch(&self) -> Epoch {
match &self.0 {
AvailableBlockInner::Block(block) => block.epoch(),
AvailableBlockInner::BlockAndBlob(block_sidecar_pair) => {
block_sidecar_pair.beacon_block.epoch()
}
}
}
fn parent_root(&self) -> Hash256 {
match &self.0 {
AvailableBlockInner::Block(block) => block.parent_root(),

View File

@ -209,6 +209,9 @@ pub enum BeaconChainError {
BlsToExecutionChangeBadFork(ForkName),
InconsistentFork(InconsistentFork),
ProposerHeadForkChoiceError(fork_choice::Error<proto_array::Error>),
BlobsUnavailable,
NoKzgCommitmentsFieldOnBlock,
BlobsOlderThanDataAvailabilityBoundary(Epoch),
}
easy_from_to!(SlotProcessingError, BeaconChainError);

View File

@ -513,6 +513,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
Protocol::MetaData => PeerAction::LowToleranceError,
Protocol::Status => PeerAction::LowToleranceError,
},
RPCResponseErrorCode::BlobsNotFoundForBlock => PeerAction::LowToleranceError,
},
RPCError::SSZDecodeError(_) => PeerAction::Fatal,
RPCError::UnsupportedProtocol => {

View File

@ -330,6 +330,7 @@ pub struct LightClientBootstrapRequest {
#[strum(serialize_all = "snake_case")]
pub enum RPCResponseErrorCode {
RateLimited,
BlobsNotFoundForBlock,
InvalidRequest,
ServerError,
/// Error spec'd to indicate that a peer does not have blocks on a requested range.
@ -359,6 +360,7 @@ impl<T: EthSpec> RPCCodedResponse<T> {
2 => RPCResponseErrorCode::ServerError,
3 => RPCResponseErrorCode::ResourceUnavailable,
139 => RPCResponseErrorCode::RateLimited,
140 => RPCResponseErrorCode::BlobsNotFoundForBlock,
_ => RPCResponseErrorCode::Unknown,
};
RPCCodedResponse::Error(code, err)
@ -397,6 +399,7 @@ impl RPCResponseErrorCode {
RPCResponseErrorCode::ResourceUnavailable => 3,
RPCResponseErrorCode::Unknown => 255,
RPCResponseErrorCode::RateLimited => 139,
RPCResponseErrorCode::BlobsNotFoundForBlock => 140,
}
}
}
@ -425,6 +428,7 @@ impl std::fmt::Display for RPCResponseErrorCode {
RPCResponseErrorCode::ServerError => "Server error occurred",
RPCResponseErrorCode::Unknown => "Unknown error occurred",
RPCResponseErrorCode::RateLimited => "Rate limited",
RPCResponseErrorCode::BlobsNotFoundForBlock => "No blobs for the given root",
};
f.write_str(repr)
}
@ -507,9 +511,23 @@ impl std::fmt::Display for OldBlocksByRangeRequest {
}
}
impl std::fmt::Display for BlobsByRootRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Request: BlobsByRoot: Number of Requested Roots: {}",
self.block_roots.len()
)
}
}
impl std::fmt::Display for BlobsByRangeRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Start Slot: {}, Count: {}", self.start_slot, self.count)
write!(
f,
"Request: BlobsByRange: Start Slot: {}, Count: {}",
self.start_slot, self.count
)
}
}

View File

@ -15,7 +15,7 @@ use slot_clock::SlotClock;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::light_client_bootstrap::LightClientBootstrap;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlockAndBlobsSidecar, Slot};
use types::{Epoch, EthSpec, Hash256, Slot};
use super::Worker;
@ -122,7 +122,10 @@ impl<T: BeaconChainTypes> Worker<T> {
};
self.send_sync_message(SyncMessage::AddPeer(peer_id, info));
}
Err(e) => error!(self.log, "Could not process status message"; "error" => ?e),
Err(e) => error!(self.log, "Could not process status message";
"peer" => %peer_id,
"error" => ?e
),
}
}
@ -195,7 +198,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"Received BlocksByRoot Request";
"peer" => %peer_id,
"requested" => request.block_roots.len(),
"returned" => %send_block_count
"returned" => send_block_count
);
// send stream termination
@ -227,18 +230,15 @@ impl<T: BeaconChainTypes> Worker<T> {
.get_block_and_blobs_checking_early_attester_cache(root)
.await
{
Ok((Some(block), Some(blobs))) => {
Ok(Some(block_and_blobs)) => {
self.send_response(
peer_id,
Response::BlobsByRoot(Some(SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
})),
Response::BlobsByRoot(Some(block_and_blobs)),
request_id,
);
send_block_count += 1;
}
Ok((None, None)) => {
Ok(None) => {
debug!(
self.log,
"Peer requested unknown block and blobs";
@ -246,29 +246,56 @@ impl<T: BeaconChainTypes> Worker<T> {
"request_root" => ?root
);
}
Ok((Some(_), None)) => {
debug!(
Err(BeaconChainError::BlobsUnavailable) => {
error!(
self.log,
"Peer requested block and blob, but no blob found";
"No blobs in the store for block root";
"request" => ?request,
"peer" => %peer_id,
"request_root" => ?root
"block_root" => ?root
);
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"No blob for requested block".into(),
RPCResponseErrorCode::BlobsNotFoundForBlock,
"Blobs not found for block root".into(),
request_id,
);
send_response = false;
break;
}
Ok((None, Some(_))) => {
Err(BeaconChainError::NoKzgCommitmentsFieldOnBlock) => {
debug!(
self.log,
"Peer requested block and blob, but no block found";
"Peer requested blobs for a pre-eip4844 block";
"peer" => %peer_id,
"request_root" => ?root
"block_root" => ?root,
);
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Failed reading field kzg_commitments from block".into(),
request_id,
);
send_response = false;
break;
}
Err(BeaconChainError::BlobsOlderThanDataAvailabilityBoundary(block_epoch)) => {
debug!(
self.log,
"Peer requested block and blobs older than the data availability \
boundary for ByRoot request, no blob found";
"peer" => %peer_id,
"request_root" => ?root,
"block_epoch" => ?block_epoch,
);
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Blobs older than data availability boundary".into(),
request_id,
);
send_response = false;
break;
}
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
debug!(
@ -303,12 +330,12 @@ impl<T: BeaconChainTypes> Worker<T> {
"Received BlobsByRoot Request";
"peer" => %peer_id,
"requested" => request.block_roots.len(),
"returned" => %send_block_count
"returned" => send_block_count
);
// send stream termination
if send_response {
self.send_response(peer_id, Response::BlocksByRoot(None), request_id);
self.send_response(peer_id, Response::BlobsByRoot(None), request_id);
}
drop(send_on_drop);
},
@ -331,7 +358,7 @@ impl<T: BeaconChainTypes> Worker<T> {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Bootstrap not avaiable".into(),
"Bootstrap not available".into(),
request_id,
);
return;
@ -341,7 +368,7 @@ impl<T: BeaconChainTypes> Worker<T> {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Bootstrap not avaiable".into(),
"Bootstrap not available".into(),
request_id,
);
return;
@ -354,7 +381,7 @@ impl<T: BeaconChainTypes> Worker<T> {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Bootstrap not avaiable".into(),
"Bootstrap not available".into(),
request_id,
);
return;
@ -364,7 +391,7 @@ impl<T: BeaconChainTypes> Worker<T> {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Bootstrap not avaiable".into(),
"Bootstrap not available".into(),
request_id,
);
return;
@ -376,7 +403,7 @@ impl<T: BeaconChainTypes> Worker<T> {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Bootstrap not avaiable".into(),
"Bootstrap not available".into(),
request_id,
);
return;
@ -396,7 +423,7 @@ impl<T: BeaconChainTypes> Worker<T> {
send_on_drop: SendOnDrop,
peer_id: PeerId,
request_id: PeerRequestId,
mut req: BlocksByRangeRequest,
req: BlocksByRangeRequest,
) {
debug!(self.log, "Received BlocksByRange Request";
"peer_id" => %peer_id,
@ -406,7 +433,12 @@ impl<T: BeaconChainTypes> Worker<T> {
// Should not send more than max request blocks
if req.count > MAX_REQUEST_BLOCKS {
req.count = MAX_REQUEST_BLOCKS;
return self.send_error_response(
peer_id,
RPCResponseErrorCode::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`".into(),
request_id,
);
}
let forwards_block_root_iter = match self
@ -420,7 +452,10 @@ impl<T: BeaconChainTypes> Worker<T> {
oldest_block_slot,
},
)) => {
debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot);
debug!(self.log, "Range request failed during backfill";
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
@ -428,7 +463,19 @@ impl<T: BeaconChainTypes> Worker<T> {
request_id,
);
}
Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e),
Err(e) => {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Database error".into(),
request_id,
);
return error!(self.log, "Unable to obtain root iter";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
}
};
// Pick out the required blocks, ignoring skip-slots.
@ -460,7 +507,13 @@ impl<T: BeaconChainTypes> Worker<T> {
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e),
Err(e) => {
return error!(self.log, "Error during iteration over blocks";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
)
}
};
// remove all skip slots
@ -492,8 +545,17 @@ impl<T: BeaconChainTypes> Worker<T> {
error!(
self.log,
"Block in the chain is not in the store";
"request" => ?req,
"peer" => %peer_id,
"request_root" => ?root
);
self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Database inconsistency".into(),
request_id,
);
send_response = false;
break;
}
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
@ -517,6 +579,8 @@ impl<T: BeaconChainTypes> Worker<T> {
error!(
self.log,
"Error fetching block for peer";
"request" => ?req,
"peer" => %peer_id,
"block_root" => ?root,
"error" => ?e
);
@ -584,7 +648,7 @@ impl<T: BeaconChainTypes> Worker<T> {
send_on_drop: SendOnDrop,
peer_id: PeerId,
request_id: PeerRequestId,
mut req: BlobsByRangeRequest,
req: BlobsByRangeRequest,
) {
debug!(self.log, "Received BlobsByRange Request";
"peer_id" => %peer_id,
@ -594,31 +658,104 @@ impl<T: BeaconChainTypes> Worker<T> {
// Should not send more than max request blocks
if req.count > MAX_REQUEST_BLOBS_SIDECARS {
req.count = MAX_REQUEST_BLOBS_SIDECARS;
return self.send_error_response(
peer_id,
RPCResponseErrorCode::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`".into(),
request_id,
);
}
let forwards_block_root_iter = match self
.chain
.forwards_iter_block_roots(Slot::from(req.start_slot))
{
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockError(
HistoricalBlockError::BlockOutOfRange {
slot,
oldest_block_slot,
},
)) => {
debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot);
let data_availability_boundary = match self.chain.data_availability_boundary() {
Some(boundary) => boundary,
None => {
debug!(self.log, "Eip4844 fork is disabled");
self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Eip4844 fork is disabled".into(),
request_id,
);
return;
}
};
let start_slot = Slot::from(req.start_slot);
let start_epoch = start_slot.epoch(T::EthSpec::slots_per_epoch());
// If the peer requests data from beyond the data availability boundary we altruistically
// cap to the right time range.
let serve_blobs_from_slot = if start_epoch < data_availability_boundary {
// Attempt to serve from the earliest block in our database, falling back to the data
// availability boundary
let oldest_blob_slot = self
.chain
.store
.get_blob_info()
.map(|blob_info| blob_info.oldest_blob_slot)
.unwrap_or(data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch()));
debug!(
self.log,
"Range request start slot is older than data availability boundary";
"requested_slot" => req.start_slot,
"oldest_known_slot" => oldest_blob_slot,
"data_availability_boundary" => data_availability_boundary
);
// Check if the request is entirely out of the data availability period. The
// `oldest_blob_slot` is the oldest slot in the database, so includes a margin of error
// controlled by our prune margin.
let end_request_slot = start_slot + req.count;
if oldest_blob_slot < end_request_slot {
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Backfilling".into(),
RPCResponseErrorCode::InvalidRequest,
"Request outside of data availability period".into(),
request_id,
);
}
Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e),
std::cmp::max(oldest_blob_slot, start_slot)
} else {
start_slot
};
// If the peer requests data from beyond the data availability boundary we altruistically cap to the right time range
let forwards_block_root_iter =
match self.chain.forwards_iter_block_roots(serve_blobs_from_slot) {
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockError(
HistoricalBlockError::BlockOutOfRange {
slot,
oldest_block_slot,
},
)) => {
debug!(self.log, "Range request failed during backfill";
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Backfilling".into(),
request_id,
);
}
Err(e) => {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Database error".into(),
request_id,
);
return error!(self.log, "Unable to obtain root iter";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
}
};
// Pick out the required blocks, ignoring skip-slots.
let mut last_block_root = req
.start_slot
@ -648,17 +785,23 @@ impl<T: BeaconChainTypes> Worker<T> {
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e),
Err(e) => {
return error!(self.log, "Error during iteration over blocks";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
)
}
};
// remove all skip slots
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
let mut blobs_sent = 0;
let send_response = true;
let mut send_response = true;
for root in block_roots {
match self.chain.get_blobs(&root) {
match self.chain.get_blobs(&root, data_availability_boundary) {
Ok(Some(blobs)) => {
blobs_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
@ -671,17 +814,52 @@ impl<T: BeaconChainTypes> Worker<T> {
error!(
self.log,
"No blobs or block in the store for block root";
"request" => ?req,
"peer" => %peer_id,
"block_root" => ?root
);
self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Database inconsistency".into(),
request_id,
);
send_response = false;
break;
}
Err(BeaconChainError::BlobsUnavailable) => {
error!(
self.log,
"No blobs in the store for block root";
"request" => ?req,
"peer" => %peer_id,
"block_root" => ?root
);
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Blobs unavailable".into(),
request_id,
);
send_response = false;
break;
}
Err(e) => {
error!(
self.log,
"Error fetching blob for peer";
"Error fetching blinded block for block root";
"request" => ?req,
"peer" => %peer_id,
"block_root" => ?root,
"error" => ?e
);
self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"No blobs and failed fetching corresponding block".into(),
request_id,
);
send_response = false;
break;
}
}

View File

@ -199,7 +199,7 @@ impl<E: EthSpec, Payload: AbstractExecPayload<E>> SignedBeaconBlock<E, Payload>
}
let domain = spec.get_domain(
self.slot().epoch(E::slots_per_epoch()),
self.epoch(),
Domain::BeaconProposer,
fork,
genesis_validators_root,
@ -231,6 +231,11 @@ impl<E: EthSpec, Payload: AbstractExecPayload<E>> SignedBeaconBlock<E, Payload>
self.message().slot()
}
/// Convenience accessor for the block's epoch.
pub fn epoch(&self) -> Epoch {
self.message().slot().epoch(E::slots_per_epoch())
}
/// Convenience accessor for the block's parent root.
pub fn parent_root(&self) -> Hash256 {
self.message().parent_root()