review updates

This commit is contained in:
realbigsean 2023-01-24 15:30:29 +01:00
parent 2225e6ac89
commit 18d4faf611
2 changed files with 89 additions and 62 deletions

View File

@ -956,13 +956,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn get_block_and_blobs_checking_early_attester_cache( pub async fn get_block_and_blobs_checking_early_attester_cache(
&self, &self,
block_root: &Hash256, block_root: &Hash256,
) -> Result< ) -> Result<Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>, Error> {
Option<(
Arc<SignedBeaconBlock<T::EthSpec>>,
Arc<BlobsSidecar<T::EthSpec>>,
)>,
Error,
> {
// If there is no data availability boundary, the Eip4844 fork is disabled. // If there is no data availability boundary, the Eip4844 fork is disabled.
if let Some(finalized_data_availability_boundary) = if let Some(finalized_data_availability_boundary) =
self.finalized_data_availability_boundary() self.finalized_data_availability_boundary()
@ -972,13 +966,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.early_attester_cache.get_block(*block_root), self.early_attester_cache.get_block(*block_root),
self.early_attester_cache.get_blobs(*block_root), self.early_attester_cache.get_blobs(*block_root),
) { ) {
Ok(Some((block, blobs))) Ok(Some(SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
}))
// Attempt to get the block and blobs from the database // Attempt to get the block and blobs from the database
} else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) { } else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) {
let blobs = self let blobs = self
.get_blobs(block_root, finalized_data_availability_boundary)? .get_blobs(block_root, finalized_data_availability_boundary)?
.map(Arc::new); .map(Arc::new);
Ok(blobs.map(|blobs| (block, blobs))) Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
}))
} else { } else {
Ok(None) Ok(None)
} }

View File

@ -14,8 +14,9 @@ use slog::{debug, error};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::count;
use types::light_client_bootstrap::LightClientBootstrap; use types::light_client_bootstrap::LightClientBootstrap;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlockAndBlobsSidecar, Slot}; use types::{Epoch, EthSpec, Hash256, Slot};
use super::Worker; use super::Worker;
@ -198,7 +199,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"Received BlocksByRoot Request"; "Received BlocksByRoot Request";
"peer" => %peer_id, "peer" => %peer_id,
"requested" => request.block_roots.len(), "requested" => request.block_roots.len(),
"returned" => %send_block_count "returned" => send_block_count
); );
// send stream termination // send stream termination
@ -230,13 +231,10 @@ impl<T: BeaconChainTypes> Worker<T> {
.get_block_and_blobs_checking_early_attester_cache(root) .get_block_and_blobs_checking_early_attester_cache(root)
.await .await
{ {
Ok(Some((block, blobs))) => { Ok(Some(block_and_blobs)) => {
self.send_response( self.send_response(
peer_id, peer_id,
Response::BlobsByRoot(Some(SignedBeaconBlockAndBlobsSidecar { Response::BlobsByRoot(Some(block_and_blobs)),
beacon_block: block,
blobs_sidecar: blobs,
})),
request_id, request_id,
); );
send_block_count += 1; send_block_count += 1;
@ -253,7 +251,7 @@ impl<T: BeaconChainTypes> Worker<T> {
error!( error!(
self.log, self.log,
"No blobs in the store for block root"; "No blobs in the store for block root";
"request" => %request, "request" => ?request,
"peer" => %peer_id, "peer" => %peer_id,
"block_root" => ?root "block_root" => ?root
); );
@ -333,7 +331,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"Received BlobsByRoot Request"; "Received BlobsByRoot Request";
"peer" => %peer_id, "peer" => %peer_id,
"requested" => request.block_roots.len(), "requested" => request.block_roots.len(),
"returned" => %send_block_count "returned" => send_block_count
); );
// send stream termination // send stream termination
@ -430,13 +428,18 @@ impl<T: BeaconChainTypes> Worker<T> {
) { ) {
debug!(self.log, "Received BlocksByRange Request"; debug!(self.log, "Received BlocksByRange Request";
"peer_id" => %peer_id, "peer_id" => %peer_id,
"count" => %req.count, "count" => req.count,
"start_slot" => %req.start_slot, "start_slot" => req.start_slot,
); );
// Should not send more than max request blocks // Should not send more than max request blocks
if req.count > MAX_REQUEST_BLOCKS { if req.count > MAX_REQUEST_BLOCKS {
req.count = MAX_REQUEST_BLOCKS; return self.send_error_response(
peer_id,
RPCResponseErrorCode::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`".into(),
request_id,
);
} }
let forwards_block_root_iter = match self let forwards_block_root_iter = match self
@ -451,8 +454,8 @@ impl<T: BeaconChainTypes> Worker<T> {
}, },
)) => { )) => {
debug!(self.log, "Range request failed during backfill"; debug!(self.log, "Range request failed during backfill";
"requested_slot" => %slot, "requested_slot" => slot,
"oldest_known_slot" => %oldest_block_slot "oldest_known_slot" => oldest_block_slot
); );
return self.send_error_response( return self.send_error_response(
peer_id, peer_id,
@ -463,7 +466,7 @@ impl<T: BeaconChainTypes> Worker<T> {
} }
Err(e) => { Err(e) => {
return error!(self.log, "Unable to obtain root iter"; return error!(self.log, "Unable to obtain root iter";
"request" => %req, "request" => ?req,
"peer" => %peer_id, "peer" => %peer_id,
"error" => ?e "error" => ?e
) )
@ -501,7 +504,7 @@ impl<T: BeaconChainTypes> Worker<T> {
Ok(block_roots) => block_roots, Ok(block_roots) => block_roots,
Err(e) => { Err(e) => {
return error!(self.log, "Error during iteration over blocks"; return error!(self.log, "Error during iteration over blocks";
"request" => %req, "request" => ?req,
"peer" => %peer_id, "peer" => %peer_id,
"error" => ?e "error" => ?e
) )
@ -537,7 +540,7 @@ impl<T: BeaconChainTypes> Worker<T> {
error!( error!(
self.log, self.log,
"Block in the chain is not in the store"; "Block in the chain is not in the store";
"request" => %req, "request" => ?req,
"peer" => %peer_id, "peer" => %peer_id,
"request_root" => ?root "request_root" => ?root
); );
@ -564,7 +567,7 @@ impl<T: BeaconChainTypes> Worker<T> {
error!( error!(
self.log, self.log,
"Error fetching block for peer"; "Error fetching block for peer";
"request" => %req, "request" => ?req,
"peer" => %peer_id, "peer" => %peer_id,
"block_root" => ?root, "block_root" => ?root,
"error" => ?e "error" => ?e
@ -594,20 +597,20 @@ impl<T: BeaconChainTypes> Worker<T> {
"BlocksByRange outgoing response processed"; "BlocksByRange outgoing response processed";
"peer" => %peer_id, "peer" => %peer_id,
"msg" => "Failed to return all requested blocks", "msg" => "Failed to return all requested blocks",
"start_slot" => %req.start_slot, "start_slot" => req.start_slot,
"current_slot" => %current_slot, "current_slot" => current_slot,
"requested" => %req.count, "requested" => req.count,
"returned" => %blocks_sent "returned" => blocks_sent
); );
} else { } else {
debug!( debug!(
self.log, self.log,
"BlocksByRange outgoing response processed"; "BlocksByRange outgoing response processed";
"peer" => %peer_id, "peer" => %peer_id,
"start_slot" => %req.start_slot, "start_slot" => req.start_slot,
"current_slot" => %current_slot, "current_slot" => current_slot,
"requested" => %req.count, "requested" => req.count,
"returned" => %blocks_sent "returned" => blocks_sent
); );
} }
@ -637,12 +640,20 @@ impl<T: BeaconChainTypes> Worker<T> {
) { ) {
debug!(self.log, "Received BlobsByRange Request"; debug!(self.log, "Received BlobsByRange Request";
"peer_id" => %peer_id, "peer_id" => %peer_id,
"count" => %req.count, "count" => req.count,
"start_slot" => %req.start_slot, "start_slot" => req.start_slot,
); );
let start_slot = Slot::from(req.start_slot); // Should not send more than max request blocks
let start_epoch = start_slot.epoch(T::EthSpec::slots_per_epoch()); if req.count > MAX_REQUEST_BLOBS_SIDECARS {
return self.send_error_response(
peer_id,
RPCResponseErrorCode::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`".into(),
request_id,
);
}
let data_availability_boundary = match self.chain.data_availability_boundary() { let data_availability_boundary = match self.chain.data_availability_boundary() {
Some(boundary) => boundary, Some(boundary) => boundary,
None => { None => {
@ -657,31 +668,47 @@ impl<T: BeaconChainTypes> Worker<T> {
} }
}; };
let start_slot = Slot::from(req.start_slot);
let start_epoch = start_slot.epoch(T::EthSpec::slots_per_epoch());
// If the peer requests data from beyond the data availability boundary we altruistically
// cap to the right time range.
let serve_blobs_from_slot = if start_epoch < data_availability_boundary { let serve_blobs_from_slot = if start_epoch < data_availability_boundary {
// Attempt to serve from the earliest block in our database, falling back to the data
// availability boundary
let oldest_blob_slot = self let oldest_blob_slot = self
.chain .chain
.store .store
.get_blob_info() .get_blob_info()
.map(|blob_info| blob_info.oldest_blob_slot); .map(|blob_info| blob_info.oldest_blob_slot)
.unwrap_or(data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch()));
debug!( debug!(
self.log, self.log,
"Range request start slot is older than data availability boundary"; "Range request start slot is older than data availability boundary";
"requested_slot" => %req.start_slot, "requested_slot" => req.start_slot,
"oldest_known_slot" => oldest_blob_slot, "oldest_known_slot" => oldest_blob_slot,
"data_availability_boundary" => data_availability_boundary "data_availability_boundary" => data_availability_boundary
); );
data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch()) // Check if the request is entirely out of the data availability period. The
// `oldest_blob_slot` is the oldest slot in the database, so includes a margin of error
// controlled by our prune margin.
let end_request_slot = start_slot + count;
if oldest_blob_slot < end_request_slot {
return self.send_error_response(
peer_id,
RPCResponseErrorCode::InvalidRequest,
"Request outside of data availability period".into(),
request_id,
);
}
std::cmp::max(oldest_blob_slot, start_slot)
} else { } else {
start_slot start_slot
}; };
// Should not send more than max request blocks // If the peer requests data from beyond the data availability boundary we altruistically cap to the right time range
if req.count > MAX_REQUEST_BLOBS_SIDECARS {
req.count = MAX_REQUEST_BLOBS_SIDECARS;
}
let forwards_block_root_iter = let forwards_block_root_iter =
match self.chain.forwards_iter_block_roots(serve_blobs_from_slot) { match self.chain.forwards_iter_block_roots(serve_blobs_from_slot) {
Ok(iter) => iter, Ok(iter) => iter,
@ -692,8 +719,8 @@ impl<T: BeaconChainTypes> Worker<T> {
}, },
)) => { )) => {
debug!(self.log, "Range request failed during backfill"; debug!(self.log, "Range request failed during backfill";
"requested_slot" => %slot, "requested_slot" => slot,
"oldest_known_slot" => %oldest_block_slot "oldest_known_slot" => oldest_block_slot
); );
return self.send_error_response( return self.send_error_response(
peer_id, peer_id,
@ -704,7 +731,7 @@ impl<T: BeaconChainTypes> Worker<T> {
} }
Err(e) => { Err(e) => {
return error!(self.log, "Unable to obtain root iter"; return error!(self.log, "Unable to obtain root iter";
"request" => %req, "request" => ?req,
"peer" => %peer_id, "peer" => %peer_id,
"error" => ?e "error" => ?e
) )
@ -742,7 +769,7 @@ impl<T: BeaconChainTypes> Worker<T> {
Ok(block_roots) => block_roots, Ok(block_roots) => block_roots,
Err(e) => { Err(e) => {
return error!(self.log, "Error during iteration over blocks"; return error!(self.log, "Error during iteration over blocks";
"request" => %req, "request" => ?req,
"peer" => %peer_id, "peer" => %peer_id,
"error" => ?e "error" => ?e
) )
@ -769,7 +796,7 @@ impl<T: BeaconChainTypes> Worker<T> {
error!( error!(
self.log, self.log,
"No blobs or block in the store for block root"; "No blobs or block in the store for block root";
"request" => %req, "request" => ?req,
"peer" => %peer_id, "peer" => %peer_id,
"block_root" => ?root "block_root" => ?root
); );
@ -779,7 +806,7 @@ impl<T: BeaconChainTypes> Worker<T> {
error!( error!(
self.log, self.log,
"No blobs in the store for block root"; "No blobs in the store for block root";
"request" => %req, "request" => ?req,
"peer" => %peer_id, "peer" => %peer_id,
"block_root" => ?root "block_root" => ?root
); );
@ -796,7 +823,7 @@ impl<T: BeaconChainTypes> Worker<T> {
error!( error!(
self.log, self.log,
"Error fetching blinded block for block root"; "Error fetching blinded block for block root";
"request" => %req, "request" => ?req,
"peer" => %peer_id, "peer" => %peer_id,
"block_root" => ?root, "block_root" => ?root,
"error" => ?e "error" => ?e
@ -824,20 +851,20 @@ impl<T: BeaconChainTypes> Worker<T> {
"BlobsByRange Response processed"; "BlobsByRange Response processed";
"peer" => %peer_id, "peer" => %peer_id,
"msg" => "Failed to return all requested blobs", "msg" => "Failed to return all requested blobs",
"start_slot" => %req.start_slot, "start_slot" => req.start_slot,
"current_slot" => %current_slot, "current_slot" => current_slot,
"requested" => %req.count, "requested" => req.count,
"returned" => %blobs_sent "returned" => blobs_sent
); );
} else { } else {
debug!( debug!(
self.log, self.log,
"BlobsByRange Response processed"; "BlobsByRange Response processed";
"peer" => %peer_id, "peer" => %peer_id,
"start_slot" => %req.start_slot, "start_slot" => req.start_slot,
"current_slot" => %current_slot, "current_slot" => current_slot,
"requested" => %req.count, "requested" => req.count,
"returned" => %blobs_sent "returned" => blobs_sent
); );
} }