Merge pull request #3820 from realbigsean/sync-fixes

Handle ResourceUnavailable errors and other rpc/sync fixes
This commit is contained in:
Divma 2022-12-19 12:38:33 -05:00 committed by GitHub
commit 51e588bdf9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 161 additions and 93 deletions

View File

@ -2948,6 +2948,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ops.push(StoreOp::PutState(block.state_root(), &state)); ops.push(StoreOp::PutState(block.state_root(), &state));
if let Some(blobs) = blobs { if let Some(blobs) = blobs {
//FIXME(sean) using this for debugging for now
info!(self.log, "Writing blobs to store"; "block_root" => ?block_root);
ops.push(StoreOp::PutBlobs(block_root, blobs)); ops.push(StoreOp::PutBlobs(block_root, blobs));
}; };
let txn_lock = self.store.hot_db.begin_rw_transaction(); let txn_lock = self.store.hot_db.begin_rw_transaction();

View File

@ -87,6 +87,8 @@ pub enum BlobError {
/// We were unable to process this sync committee message due to an internal error. It's unclear if the /// We were unable to process this sync committee message due to an internal error. It's unclear if the
/// sync committee message is valid. /// sync committee message is valid.
BeaconChainError(BeaconChainError), BeaconChainError(BeaconChainError),
/// No blobs for the specified block where we would expect blobs.
MissingBlobs,
} }
impl From<BeaconChainError> for BlobError { impl From<BeaconChainError> for BlobError {

View File

@ -579,10 +579,8 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
let mut signature_verified_blocks = Vec::with_capacity(chain_segment.len()); let mut signature_verified_blocks = Vec::with_capacity(chain_segment.len());
for (block_root, block) in &chain_segment { for (block_root, block) in &chain_segment {
let mut consensus_context = ConsensusContext::new(block.slot()) let mut consensus_context =
.set_current_block_root(*block_root) ConsensusContext::new(block.slot()).set_current_block_root(*block_root);
//FIXME(sean) Consider removing this is we pass the blob wrapper everywhere
.set_blobs_sidecar(block.blobs_sidecar());
signature_verifier.include_all_signatures(block.block(), &mut consensus_context)?; signature_verifier.include_all_signatures(block.block(), &mut consensus_context)?;
@ -936,8 +934,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
.set_current_block_root(block_root) .set_current_block_root(block_root)
.set_proposer_index(block.message().proposer_index()) .set_proposer_index(block.message().proposer_index())
.set_blobs_sidecar_validated(true) // Validated in `validate_blob_for_gossip` .set_blobs_sidecar_validated(true) // Validated in `validate_blob_for_gossip`
.set_blobs_verified_vs_txs(true) // Validated in `validate_blob_for_gossip` .set_blobs_verified_vs_txs(true);
.set_blobs_sidecar(block.blobs_sidecar()); // TODO: potentially remove
Ok(Self { Ok(Self {
block, block,
@ -1009,9 +1006,8 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);
let mut consensus_context = ConsensusContext::new(block.slot()) let mut consensus_context =
.set_current_block_root(block_root) ConsensusContext::new(block.slot()).set_current_block_root(block_root);
.set_blobs_sidecar(block.blobs_sidecar());
signature_verifier.include_all_signatures(block.block(), &mut consensus_context)?; signature_verifier.include_all_signatures(block.block(), &mut consensus_context)?;
@ -1564,9 +1560,11 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
* Verify kzg proofs and kzg commitments against transactions if required * Verify kzg proofs and kzg commitments against transactions if required
*/ */
//FIXME(sean) should this be prior to applying attestions to fork choice above? done in parallel? //FIXME(sean) should this be prior to applying attestions to fork choice above? done in parallel?
if let Some(ref sidecar) = consensus_context.blobs_sidecar() {
if let Some(data_availability_boundary) = chain.data_availability_boundary() { if let Some(data_availability_boundary) = chain.data_availability_boundary() {
if block_slot.epoch(T::EthSpec::slots_per_epoch()) > data_availability_boundary { if block_slot.epoch(T::EthSpec::slots_per_epoch()) >= data_availability_boundary {
let sidecar = block
.blobs()
.ok_or(BlockError::BlobValidation(BlobError::MissingBlobs))?;
let kzg = chain.kzg.as_ref().ok_or(BlockError::BlobValidation( let kzg = chain.kzg.as_ref().ok_or(BlockError::BlobValidation(
BlobError::TrustedSetupNotInitialized, BlobError::TrustedSetupNotInitialized,
))?; ))?;
@ -1577,10 +1575,11 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
.map(|payload| payload.transactions()) .map(|payload| payload.transactions())
.map_err(|_| BlockError::BlobValidation(BlobError::TransactionsMissing))? .map_err(|_| BlockError::BlobValidation(BlobError::TransactionsMissing))?
.ok_or(BlockError::BlobValidation(BlobError::TransactionsMissing))?; .ok_or(BlockError::BlobValidation(BlobError::TransactionsMissing))?;
let kzg_commitments = let kzg_commitments = block
block.message().body().blob_kzg_commitments().map_err(|_| { .message()
BlockError::BlobValidation(BlobError::KzgCommitmentMissing) .body()
})?; .blob_kzg_commitments()
.map_err(|_| BlockError::BlobValidation(BlobError::KzgCommitmentMissing))?;
if !consensus_context.blobs_sidecar_validated() { if !consensus_context.blobs_sidecar_validated() {
if !kzg_utils::validate_blobs_sidecar( if !kzg_utils::validate_blobs_sidecar(
&kzg, &kzg,
@ -1608,7 +1607,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
} }
} }
} }
}
Ok(Self { Ok(Self {
block, block,

View File

@ -9,6 +9,7 @@ use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tree_hash::TreeHash; use tree_hash::TreeHash;
use types::signed_block_and_blobs::BlockWrapper;
use types::{ use types::{
AbstractExecPayload, BlindedPayload, BlobsSidecar, EthSpec, ExecPayload, ExecutionBlockHash, AbstractExecPayload, BlindedPayload, BlobsSidecar, EthSpec, ExecPayload, ExecutionBlockHash,
FullPayload, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, FullPayload, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar,
@ -32,12 +33,19 @@ pub async fn publish_block<T: BeaconChainTypes>(
// Send the block, regardless of whether or not it is valid. The API // Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour. // specification is very clear that this is the desired behaviour.
let message = if matches!(block.as_ref(), &SignedBeaconBlock::Eip4844(_)) { let wrapped_block = if matches!(block.as_ref(), &SignedBeaconBlock::Eip4844(_)) {
if let Some(sidecar) = chain.blob_cache.pop(&block_root) { if let Some(sidecar) = chain.blob_cache.pop(&block_root) {
PubsubMessage::BeaconBlockAndBlobsSidecars(SignedBeaconBlockAndBlobsSidecar { let block_and_blobs = SignedBeaconBlockAndBlobsSidecar {
beacon_block: block.clone(), beacon_block: block,
blobs_sidecar: Arc::new(sidecar), blobs_sidecar: Arc::new(sidecar),
}) };
crate::publish_pubsub_message(
network_tx,
PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs.clone()),
)?;
BlockWrapper::BlockAndBlob {
block_sidecar_pair: block_and_blobs,
}
} else { } else {
//FIXME(sean): This should probably return a specific no-blob-cached error code, beacon API coordination required //FIXME(sean): This should probably return a specific no-blob-cached error code, beacon API coordination required
return Err(warp_utils::reject::broadcast_without_import(format!( return Err(warp_utils::reject::broadcast_without_import(format!(
@ -45,18 +53,19 @@ pub async fn publish_block<T: BeaconChainTypes>(
))); )));
} }
} else { } else {
PubsubMessage::BeaconBlock(block.clone()) crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?;
BlockWrapper::Block { block }
}; };
crate::publish_pubsub_message(network_tx, message)?;
// Determine the delay after the start of the slot, register it with metrics. // Determine the delay after the start of the slot, register it with metrics.
let block = wrapped_block.block();
let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock);
metrics::observe_duration(&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, delay); metrics::observe_duration(&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, delay);
match chain match chain
.process_block( .process_block(
block_root, block_root,
block.clone(), wrapped_block.clone(),
CountUnrealized::True, CountUnrealized::True,
NotifyExecutionLayer::Yes, NotifyExecutionLayer::Yes,
) )

View File

@ -473,6 +473,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
RPCError::ErrorResponse(code, _) => match code { RPCError::ErrorResponse(code, _) => match code {
RPCResponseErrorCode::Unknown => PeerAction::HighToleranceError, RPCResponseErrorCode::Unknown => PeerAction::HighToleranceError,
RPCResponseErrorCode::ResourceUnavailable => { RPCResponseErrorCode::ResourceUnavailable => {
// Don't ban on this because we want to retry with a block by root request.
if matches!(protocol, Protocol::BlobsByRoot) {
return;
}
// NOTE: This error only makes sense for the `BlocksByRange` and `BlocksByRoot` // NOTE: This error only makes sense for the `BlocksByRange` and `BlocksByRoot`
// protocols. // protocols.
// //

View File

@ -531,9 +531,6 @@ fn handle_v2_request<T: EthSpec>(
Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest { Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
}))), }))),
Protocol::BlobsByRange => Ok(Some(InboundRequest::BlobsByRange(
BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
// MetaData requests return early from InboundUpgrade and do not reach the decoder. // MetaData requests return early from InboundUpgrade and do not reach the decoder.
// Handle this case just for completeness. // Handle this case just for completeness.
Protocol::MetaData => { Protocol::MetaData => {
@ -826,12 +823,25 @@ mod tests {
} }
} }
fn blbrange_request() -> BlobsByRangeRequest {
BlobsByRangeRequest {
start_slot: 0,
count: 10,
}
}
fn bbroot_request() -> BlocksByRootRequest { fn bbroot_request() -> BlocksByRootRequest {
BlocksByRootRequest { BlocksByRootRequest {
block_roots: VariableList::from(vec![Hash256::zero()]), block_roots: VariableList::from(vec![Hash256::zero()]),
} }
} }
fn blbroot_request() -> BlobsByRootRequest {
BlobsByRootRequest {
block_roots: VariableList::from(vec![Hash256::zero()]),
}
}
fn ping_message() -> Ping { fn ping_message() -> Ping {
Ping { data: 1 } Ping { data: 1 }
} }
@ -1454,6 +1464,8 @@ mod tests {
OutboundRequest::Goodbye(GoodbyeReason::Fault), OutboundRequest::Goodbye(GoodbyeReason::Fault),
OutboundRequest::BlocksByRange(bbrange_request()), OutboundRequest::BlocksByRange(bbrange_request()),
OutboundRequest::BlocksByRoot(bbroot_request()), OutboundRequest::BlocksByRoot(bbroot_request()),
OutboundRequest::BlobsByRange(blbrange_request()),
OutboundRequest::BlobsByRoot(blbroot_request()),
OutboundRequest::MetaData(PhantomData::<Spec>), OutboundRequest::MetaData(PhantomData::<Spec>),
]; ];
for req in requests.iter() { for req in requests.iter() {

View File

@ -75,6 +75,8 @@ pub enum NetworkEvent<AppReqId: ReqId, TSpec: EthSpec> {
id: AppReqId, id: AppReqId,
/// The peer to which this request was sent. /// The peer to which this request was sent.
peer_id: PeerId, peer_id: PeerId,
/// The error of the failed request.
error: RPCError,
}, },
RequestReceived { RequestReceived {
/// The peer that sent the request. /// The peer that sent the request.
@ -1177,9 +1179,9 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
&error, &error,
ConnectionDirection::Outgoing, ConnectionDirection::Outgoing,
); );
// inform failures of requests comming outside the behaviour // inform failures of requests coming outside the behaviour
if let RequestId::Application(id) = id { if let RequestId::Application(id) = id {
Some(NetworkEvent::RPCFailed { peer_id, id }) Some(NetworkEvent::RPCFailed { peer_id, id, error })
} else { } else {
None None
} }

View File

@ -254,6 +254,14 @@ impl<T: BeaconChainTypes> Worker<T> {
"peer" => %peer_id, "peer" => %peer_id,
"request_root" => ?root "request_root" => ?root
); );
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"No blob for requested block".into(),
request_id,
);
send_response = false;
break;
} }
Ok((None, Some(_))) => { Ok((None, Some(_))) => {
debug!( debug!(

View File

@ -11,6 +11,7 @@ use crate::error;
use crate::service::{NetworkMessage, RequestId}; use crate::service::{NetworkMessage, RequestId};
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use futures::prelude::*; use futures::prelude::*;
use lighthouse_network::rpc::RPCError;
use lighthouse_network::{ use lighthouse_network::{
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
}; };
@ -58,6 +59,7 @@ pub enum RouterMessage<T: EthSpec> {
RPCFailed { RPCFailed {
peer_id: PeerId, peer_id: PeerId,
request_id: RequestId, request_id: RequestId,
error: RPCError,
}, },
/// A gossip message has been received. The fields are: message id, the peer that sent us this /// A gossip message has been received. The fields are: message id, the peer that sent us this
/// message, the message itself and a bool which indicates if the message should be processed /// message, the message itself and a bool which indicates if the message should be processed
@ -140,8 +142,9 @@ impl<T: BeaconChainTypes> Router<T> {
RouterMessage::RPCFailed { RouterMessage::RPCFailed {
peer_id, peer_id,
request_id, request_id,
error,
} => { } => {
self.processor.on_rpc_error(peer_id, request_id); self.processor.on_rpc_error(peer_id, request_id, error);
} }
RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => { RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => {
self.handle_gossip(id, peer_id, gossip, should_process); self.handle_gossip(id, peer_id, gossip, should_process);

View File

@ -103,12 +103,13 @@ impl<T: BeaconChainTypes> Processor<T> {
/// An error occurred during an RPC request. The state is maintained by the sync manager, so /// An error occurred during an RPC request. The state is maintained by the sync manager, so
/// this function notifies the sync manager of the error. /// this function notifies the sync manager of the error.
pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId) { pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
// Check if the failed RPC belongs to sync // Check if the failed RPC belongs to sync
if let RequestId::Sync(request_id) = request_id { if let RequestId::Sync(request_id) = request_id {
self.send_to_sync(SyncMessage::RpcError { self.send_to_sync(SyncMessage::RpcError {
peer_id, peer_id,
request_id, request_id,
error,
}); });
} }
} }

View File

@ -499,10 +499,11 @@ impl<T: BeaconChainTypes> NetworkService<T> {
response, response,
}); });
} }
NetworkEvent::RPCFailed { id, peer_id } => { NetworkEvent::RPCFailed { id, peer_id, error } => {
self.send_to_router(RouterMessage::RPCFailed { self.send_to_router(RouterMessage::RPCFailed {
peer_id, peer_id,
request_id: id, request_id: id,
error,
}); });
} }
NetworkEvent::StatusPeer(peer_id) => { NetworkEvent::StatusPeer(peer_id) => {

View File

@ -6,6 +6,7 @@ use beacon_chain::{BeaconChainTypes, BlockError};
use fnv::FnvHashMap; use fnv::FnvHashMap;
use futures::StreamExt; use futures::StreamExt;
use itertools::{Either, Itertools}; use itertools::{Either, Itertools};
use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode};
use lighthouse_network::{PeerAction, PeerId}; use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache; use lru_cache::LRUTimeCache;
use slog::{debug, error, trace, warn, Logger}; use slog::{debug, error, trace, warn, Logger};
@ -40,6 +41,13 @@ pub type RootBlockTuple<T> = (Hash256, BlockWrapper<T>);
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
/// This is used to resolve the scenario where we request a parent from before the data availability
/// boundary and need to retry with a request for only the block.
pub enum ForceBlockRequest {
True,
False,
}
pub(crate) struct BlockLookups<T: BeaconChainTypes> { pub(crate) struct BlockLookups<T: BeaconChainTypes> {
/// Parent chain lookups being downloaded. /// Parent chain lookups being downloaded.
parent_lookups: SmallVec<[ParentLookup<T>; 3]>, parent_lookups: SmallVec<[ParentLookup<T>; 3]>,
@ -165,7 +173,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
} }
let parent_lookup = ParentLookup::new(block_root, block, peer_id); let parent_lookup = ParentLookup::new(block_root, block, peer_id);
self.request_parent(parent_lookup, cx); self.request_parent(parent_lookup, cx, ForceBlockRequest::False);
} }
/* Lookup responses */ /* Lookup responses */
@ -291,7 +299,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx.report_peer(peer_id, PeerAction::LowToleranceError, e); cx.report_peer(peer_id, PeerAction::LowToleranceError, e);
// We try again if possible. // We try again if possible.
self.request_parent(parent_lookup, cx); self.request_parent(parent_lookup, cx, ForceBlockRequest::False);
} }
VerifyError::PreviousFailure { parent_root } => { VerifyError::PreviousFailure { parent_root } => {
debug!( debug!(
@ -367,7 +375,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
{ {
let parent_lookup = self.parent_lookups.remove(pos); let parent_lookup = self.parent_lookups.remove(pos);
trace!(self.log, "Parent lookup's peer disconnected"; &parent_lookup); trace!(self.log, "Parent lookup's peer disconnected"; &parent_lookup);
self.request_parent(parent_lookup, cx); self.request_parent(parent_lookup, cx, ForceBlockRequest::False);
} }
} }
@ -377,6 +385,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
id: Id, id: Id,
peer_id: PeerId, peer_id: PeerId,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
error: RPCError,
) { ) {
if let Some(pos) = self if let Some(pos) = self
.parent_lookups .parent_lookups
@ -386,7 +395,19 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let mut parent_lookup = self.parent_lookups.remove(pos); let mut parent_lookup = self.parent_lookups.remove(pos);
parent_lookup.download_failed(); parent_lookup.download_failed();
trace!(self.log, "Parent lookup request failed"; &parent_lookup); trace!(self.log, "Parent lookup request failed"; &parent_lookup);
self.request_parent(parent_lookup, cx);
// `ResourceUnavailable` indicates we requested a parent block from prior to the 4844 fork epoch.
let force_block_request = if let RPCError::ErrorResponse(
RPCResponseErrorCode::ResourceUnavailable,
_,
) = error
{
debug!(self.log, "RPC parent lookup for block and blobs failed. Retrying the request for just a block"; "peer_id" => %peer_id);
ForceBlockRequest::True
} else {
ForceBlockRequest::False
};
self.request_parent(parent_lookup, cx, force_block_request);
} else { } else {
return debug!(self.log, "RPC failure for a parent lookup request that was not found"; "peer_id" => %peer_id); return debug!(self.log, "RPC failure for a parent lookup request that was not found"; "peer_id" => %peer_id);
}; };
@ -542,7 +563,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// need to keep looking for parents // need to keep looking for parents
// add the block back to the queue and continue the search // add the block back to the queue and continue the search
parent_lookup.add_block(block); parent_lookup.add_block(block);
self.request_parent(parent_lookup, cx); self.request_parent(parent_lookup, cx, ForceBlockRequest::False);
} }
BlockProcessResult::Ok BlockProcessResult::Ok
| BlockProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => { | BlockProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => {
@ -604,7 +625,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// Try again if possible // Try again if possible
parent_lookup.processing_failed(); parent_lookup.processing_failed();
self.request_parent(parent_lookup, cx); self.request_parent(parent_lookup, cx, ForceBlockRequest::False);
} }
BlockProcessResult::Ignored => { BlockProcessResult::Ignored => {
// Beacon processor signalled to ignore the block processing result. // Beacon processor signalled to ignore the block processing result.
@ -697,8 +718,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self, &mut self,
mut parent_lookup: ParentLookup<T>, mut parent_lookup: ParentLookup<T>,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
force_block_request: ForceBlockRequest,
) { ) {
match parent_lookup.request_parent(cx) { match parent_lookup.request_parent(cx, force_block_request) {
Err(e) => { Err(e) => {
debug!(self.log, "Failed to request parent"; &parent_lookup, "error" => e.as_static()); debug!(self.log, "Failed to request parent"; &parent_lookup, "error" => e.as_static());
match e { match e {

View File

@ -6,6 +6,7 @@ use store::{Hash256, SignedBeaconBlock};
use strum::IntoStaticStr; use strum::IntoStaticStr;
use types::signed_block_and_blobs::BlockWrapper; use types::signed_block_and_blobs::BlockWrapper;
use crate::sync::block_lookups::ForceBlockRequest;
use crate::sync::{ use crate::sync::{
manager::{Id, SLOT_IMPORT_TOLERANCE}, manager::{Id, SLOT_IMPORT_TOLERANCE},
network_context::SyncNetworkContext, network_context::SyncNetworkContext,
@ -72,14 +73,18 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
} }
/// Attempts to request the next unknown parent. If the request fails, it should be removed. /// Attempts to request the next unknown parent. If the request fails, it should be removed.
pub fn request_parent(&mut self, cx: &mut SyncNetworkContext<T>) -> Result<(), RequestError> { pub fn request_parent(
&mut self,
cx: &mut SyncNetworkContext<T>,
force_block_request: ForceBlockRequest,
) -> Result<(), RequestError> {
// check to make sure this request hasn't failed // check to make sure this request hasn't failed
if self.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE { if self.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE {
return Err(RequestError::ChainTooLong); return Err(RequestError::ChainTooLong);
} }
let (peer_id, request) = self.current_parent_request.request_block()?; let (peer_id, request) = self.current_parent_request.request_block()?;
match cx.parent_lookup_request(peer_id, request) { match cx.parent_lookup_request(peer_id, request, force_block_request) {
Ok(request_id) => { Ok(request_id) => {
self.current_parent_request_id = Some(request_id); self.current_parent_request_id = Some(request_id);
Ok(()) Ok(())

View File

@ -45,6 +45,7 @@ use crate::sync::range_sync::ExpectedBatchTy;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
use futures::StreamExt; use futures::StreamExt;
use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS; use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode};
use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::SyncInfo; use lighthouse_network::SyncInfo;
use lighthouse_network::{PeerAction, PeerId}; use lighthouse_network::{PeerAction, PeerId};
@ -131,6 +132,7 @@ pub enum SyncMessage<T: EthSpec> {
RpcError { RpcError {
peer_id: PeerId, peer_id: PeerId,
request_id: RequestId, request_id: RequestId,
error: RPCError,
}, },
/// A batch has been processed by the block processor thread. /// A batch has been processed by the block processor thread.
@ -282,7 +284,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
/// Handles RPC errors related to requests that were emitted from the sync manager. /// Handles RPC errors related to requests that were emitted from the sync manager.
fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId) { fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
trace!(self.log, "Sync manager received a failed RPC"); trace!(self.log, "Sync manager received a failed RPC");
match request_id { match request_id {
RequestId::SingleBlock { id } => { RequestId::SingleBlock { id } => {
@ -291,7 +293,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
RequestId::ParentLookup { id } => { RequestId::ParentLookup { id } => {
self.block_lookups self.block_lookups
.parent_lookup_failed(id, peer_id, &mut self.network); .parent_lookup_failed(id, peer_id, &mut self.network, error);
} }
RequestId::BackFillSync { id } => { RequestId::BackFillSync { id } => {
if let Some(batch_id) = self if let Some(batch_id) = self
@ -603,7 +605,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncMessage::RpcError { SyncMessage::RpcError {
peer_id, peer_id,
request_id, request_id,
} => self.inject_error(peer_id, request_id), error,
} => self.inject_error(peer_id, request_id, error),
SyncMessage::BlockProcessed { SyncMessage::BlockProcessed {
process_type, process_type,
result, result,

View File

@ -6,6 +6,7 @@ use super::range_sync::{BatchId, ChainId, ExpectedBatchTy};
use crate::beacon_processor::WorkEvent; use crate::beacon_processor::WorkEvent;
use crate::service::{NetworkMessage, RequestId}; use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use crate::sync::block_lookups::ForceBlockRequest;
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
use fnv::FnvHashMap; use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::rpc::methods::BlobsByRangeRequest;
@ -504,11 +505,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request: BlocksByRootRequest, request: BlocksByRootRequest,
force_block_request: ForceBlockRequest,
) -> Result<Id, &'static str> { ) -> Result<Id, &'static str> {
let request = if self let request = if self
.chain .chain
.is_data_availability_check_required() .is_data_availability_check_required()
.map_err(|_| "Unable to read slot clock")? .map_err(|_| "Unable to read slot clock")?
&& matches!(force_block_request, ForceBlockRequest::False)
{ {
trace!( trace!(
self.log, self.log,

View File

@ -185,13 +185,4 @@ impl<T: EthSpec> ConsensusContext<T> {
pub fn blobs_verified_vs_txs(&self) -> bool { pub fn blobs_verified_vs_txs(&self) -> bool {
self.blobs_verified_vs_txs self.blobs_verified_vs_txs
} }
pub fn set_blobs_sidecar(mut self, blobs_sidecar: Option<Arc<BlobsSidecar<T>>>) -> Self {
self.blobs_sidecar = blobs_sidecar;
self
}
pub fn blobs_sidecar(&self) -> Option<Arc<BlobsSidecar<T>>> {
self.blobs_sidecar.clone()
}
} }

View File

@ -66,14 +66,6 @@ impl<T: EthSpec> BlockWrapper<T> {
} }
} }
} }
pub fn blobs_sidecar(&self) -> Option<Arc<BlobsSidecar<T>>> {
match self {
BlockWrapper::Block { block: _ } => None,
BlockWrapper::BlockAndBlob { block_sidecar_pair } => {
Some(block_sidecar_pair.blobs_sidecar.clone())
}
}
}
pub fn blobs(&self) -> Option<&BlobsSidecar<T>> { pub fn blobs(&self) -> Option<&BlobsSidecar<T>> {
match self { match self {
@ -84,6 +76,15 @@ impl<T: EthSpec> BlockWrapper<T> {
} }
} }
pub fn blobs_cloned(&self) -> Option<Arc<BlobsSidecar<T>>> {
match self {
BlockWrapper::Block { block: _ } => None,
BlockWrapper::BlockAndBlob { block_sidecar_pair } => {
Some(block_sidecar_pair.blobs_sidecar.clone())
}
}
}
pub fn message(&self) -> crate::BeaconBlockRef<T> { pub fn message(&self) -> crate::BeaconBlockRef<T> {
match self { match self {
BlockWrapper::Block { block } => block.message(), BlockWrapper::Block { block } => block.message(),