diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 706db6396..1afff4a95 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1023,7 +1023,7 @@ impl BeaconChain { ) } - pub async fn get_blobs_checking_early_attester_cache( + pub fn get_blobs_checking_early_attester_cache( &self, block_root: &Hash256, ) -> Result>, Error> { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 3046c8b39..0cf26f12f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -112,6 +112,11 @@ impl DataAvailabilityChecker { } } + /// Get a blob from the RPC cache. + pub fn get_blob(&self, blob_id: &BlobIdentifier) -> Option>> { + self.rpc_blob_cache.read().get(blob_id).cloned() + } + /// This first validate the KZG commitments included in the blob sidecar. /// Check if we've cached other blobs for this block. If it completes a set and we also /// have a block cached, return the Availability variant triggering block import. diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index 082c2242c..0aecbde16 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -22,7 +22,6 @@ pub struct CacheItem { * Values used to make the block available. */ block: Arc>, - //TODO(sean) remove this and just use the da checker?' blobs: Option>, proto_block: ProtoBlock, } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 2935250fa..9baa638f4 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -213,9 +213,6 @@ pub enum BeaconChainError { BlsToExecutionConflictsWithPool, InconsistentFork(InconsistentFork), ProposerHeadForkChoiceError(fork_choice::Error), - BlobsUnavailable, - NoKzgCommitmentsFieldOnBlock, - BlobsOlderThanDataAvailabilityBoundary(Epoch), } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 256aa5e60..c26fe7572 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -1983,13 +1983,7 @@ impl BeaconProcessor { request_id, request, } => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| { - worker.handle_blobs_by_root_request( - sub_executor, - send_idle_on_drop, - peer_id, - request_id, - request, - ) + worker.handle_blobs_by_root_request(send_idle_on_drop, peer_id, request_id, request) }), /* diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index ffdaa02dc..c8667b352 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -218,150 +218,81 @@ impl Worker { /// Handle a `BlobsByRoot` request from the peer. pub fn handle_blobs_by_root_request( self, - executor: TaskExecutor, send_on_drop: SendOnDrop, peer_id: PeerId, request_id: PeerRequestId, request: BlobsByRootRequest, ) { - // TODO: this code is grossly adjusted to free the blobs. Needs love <3 - // Fetching blocks is async because it may have to hit the execution layer for payloads. - executor.spawn( - async move { - let requested_blobs = request.blob_ids.len(); - let mut send_blob_count = 0; - let mut send_response = true; + let requested_blobs = request.blob_ids.len(); + let mut send_blob_count = 0; + let send_response = true; - let mut blob_list_results = HashMap::new(); - for BlobIdentifier{ block_root: root, index } in request.blob_ids.into_iter() { - let blob_list_result = match blob_list_results.entry(root) { - Entry::Vacant(entry) => { - entry.insert(self - .chain - .get_blobs_checking_early_attester_cache(&root) - .await) - } - Entry::Occupied(entry) => { - entry.into_mut() - } - }; + let mut blob_list_results = HashMap::new(); + for id in request.blob_ids.into_iter() { + // First attempt to get the blobs from the RPC cache. + if let Some(blob) = self.chain.data_availability_checker.get_blob(&id) { + self.send_response(peer_id, Response::BlobsByRoot(Some(blob)), request_id); + send_blob_count += 1; + } else { + let BlobIdentifier { + block_root: root, + index, + } = id; - match blob_list_result.as_ref() { - Ok(Some(blobs_sidecar_list)) => { - for blob_sidecar in blobs_sidecar_list.iter() { - if blob_sidecar.index == index { - self.send_response( - peer_id, - Response::BlobsByRoot(Some(blob_sidecar.clone())), - request_id, - ); - send_blob_count += 1; - break; - } + let blob_list_result = match blob_list_results.entry(root) { + Entry::Vacant(entry) => { + entry.insert(self.chain.get_blobs_checking_early_attester_cache(&root)) + } + Entry::Occupied(entry) => entry.into_mut(), + }; + + match blob_list_result.as_ref() { + Ok(Some(blobs_sidecar_list)) => { + 'inner: for blob_sidecar in blobs_sidecar_list.iter() { + if blob_sidecar.index == index { + self.send_response( + peer_id, + Response::BlobsByRoot(Some(blob_sidecar.clone())), + request_id, + ); + send_blob_count += 1; + break 'inner; } } - Ok(None) => { - debug!( - self.log, - "Peer requested unknown block and blobs"; - "peer" => %peer_id, - "request_root" => ?root - ); - } - Err(BeaconChainError::BlobsUnavailable) => { - error!( - self.log, - "No blobs in the store for block root"; - "peer" => %peer_id, - "block_root" => ?root - ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::BlobsNotFoundForBlock, - "Blobs not found for block root".into(), - request_id, - ); - send_response = false; - break; - } - Err(BeaconChainError::NoKzgCommitmentsFieldOnBlock) => { - debug!( - self.log, - "Peer requested blobs for a pre-deneb block"; - "peer" => %peer_id, - "block_root" => ?root, - ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Failed reading field kzg_commitments from block".into(), - request_id, - ); - send_response = false; - break; - } - Err(BeaconChainError::BlobsOlderThanDataAvailabilityBoundary(block_epoch)) => { - debug!( - self.log, - "Peer requested block and blobs older than the data availability \ - boundary for ByRoot request, no blob found"; - "peer" => %peer_id, - "request_root" => ?root, - "block_epoch" => ?block_epoch, - ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Blobs older than data availability boundary".into(), - request_id, - ); - send_response = false; - break; - } - Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { - debug!( - self.log, - "Failed to fetch execution payload for block and blobs by root request"; - "block_root" => ?root, - "reason" => "execution layer not synced", - ); - // send the stream terminator - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Execution layer not synced".into(), - request_id, - ); - send_response = false; - break; - } - Err(e) => { - debug!( - self.log, - "Error fetching block for peer"; - "peer" => %peer_id, - "request_root" => ?root, - "error" => ?e, - ); - } + } + Ok(None) => { + debug!( + self.log, + "Peer requested unknown blobs"; + "peer" => %peer_id, + "request_root" => ?root + ); + } + Err(e) => { + debug!( + self.log, + "Error fetching blob for peer"; + "peer" => %peer_id, + "request_root" => ?root, + "error" => ?e, + ); } } - debug!( - self.log, - "Received BlobsByRoot Request"; - "peer" => %peer_id, - "requested" => requested_blobs, - "returned" => send_blob_count - ); + } + } + debug!( + self.log, + "Received BlobsByRoot Request"; + "peer" => %peer_id, + "requested" => requested_blobs, + "returned" => send_blob_count + ); - // send stream termination - if send_response { - self.send_response(peer_id, Response::BlobsByRoot(None), request_id); - } - drop(send_on_drop); - }, - "load_blobs_by_root_blocks", - ) + // send stream termination + if send_response { + self.send_response(peer_id, Response::BlobsByRoot(None), request_id); + } + drop(send_on_drop); } /// Handle a `BlocksByRoot` request from the peer. @@ -851,23 +782,6 @@ impl Worker { "block_root" => ?root ); } - Err(BeaconChainError::BlobsUnavailable) => { - error!( - self.log, - "No blobs in the store for block root"; - "request" => ?req, - "peer" => %peer_id, - "block_root" => ?root - ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Blobs unavailable".into(), - request_id, - ); - send_response = false; - break; - } Err(e) => { error!( self.log,