From fc9d0a512ddfa8cf10ce5840e3839c62d41608cc Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 30 Nov 2022 10:02:29 -0500 Subject: [PATCH] handle blobs by range requests --- .../beacon_processor/worker/rpc_methods.rs | 162 ++++++++---------- 1 file changed, 74 insertions(+), 88 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 86feddec5..e95bd4e85 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -480,10 +480,10 @@ impl Worker { /// Handle a `BlobsByRange` request from the peer. pub fn handle_blobs_by_range_request( self, - _executor: TaskExecutor, - _send_on_drop: SendOnDrop, + executor: TaskExecutor, + send_on_drop: SendOnDrop, peer_id: PeerId, - _request_id: PeerRequestId, + request_id: PeerRequestId, mut req: BlobsByRangeRequest, ) { debug!(self.log, "Received BlobsByRange Request"; @@ -497,10 +497,7 @@ impl Worker { req.count = MAX_REQUEST_BLOBS_SIDECARS; } - //FIXME(sean) create the blobs iter - - /* - let forwards_blob_root_iter = match self + let forwards_block_root_iter = match self .chain .forwards_iter_block_roots(Slot::from(req.start_slot)) { @@ -512,13 +509,12 @@ impl Worker { }, )) => { debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot); - // return self.send_error_response( - // peer_id, - // RPCResponseErrorCode::ResourceUnavailable, - // "Backfilling".into(), - // request_id, - // ); - todo!("stuff") + return self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Backfilling".into(), + request_id, + ); } Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), }; @@ -548,86 +544,76 @@ impl Worker { // remove all skip slots let block_roots = block_roots.into_iter().flatten().collect::>(); - */ - // Fetching blocks is async because it may have to hit the execution layer for payloads. - /* - executor.spawn( - async move { - let mut blocks_sent = 0; - let mut send_response = true; + let mut blobs_sent = 0; + let mut send_response = true; - for root in block_roots { - match self.chain.store.get_blobs(&root) { - Ok(Some(blob)) => { - blocks_sent += 1; - self.send_network_message(NetworkMessage::SendResponse { - peer_id, - response: Response::BlobsByRange(Some(Arc::new(blob))), - id: request_id, - }); - } - Ok(None) => { - error!( - self.log, - "Blob in the chain is not in the store"; - "request_root" => ?root - ); - break; - } - Err(e) => { - error!( - self.log, - "Error fetching blob for peer"; - "block_root" => ?root, - "error" => ?e - ); - break; - } - } - } - - let current_slot = self - .chain - .slot() - .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - - if blobs_sent < (req.count as usize) { - debug!( - self.log, - "BlobsByRange Response processed"; - "peer" => %peer_id, - "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blobs_sent - ); - } else { - debug!( - self.log, - "BlobsByRange Response processed"; - "peer" => %peer_id, - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blobs_sent - ); - } - - if send_response { - // send the stream terminator + for root in block_roots { + match self.chain.store.get_blobs(&root) { + Ok(Some(blob)) => { + blobs_sent += 1; self.send_network_message(NetworkMessage::SendResponse { peer_id, - response: Response::BlobsByRange(None), + response: Response::BlobsByRange(Some(Arc::new(blob))), id: request_id, }); } + Ok(None) => { + error!( + self.log, + "Blob in the chain is not in the store"; + "request_root" => ?root + ); + break; + } + Err(e) => { + error!( + self.log, + "Error fetching blob for peer"; + "block_root" => ?root, + "error" => ?e + ); + break; + } + } + } - drop(send_on_drop); - }, - "load_blocks_by_range_blocks", - ); - */ - unimplemented!("") + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + if blobs_sent < (req.count as usize) { + debug!( + self.log, + "BlobsByRange Response processed"; + "peer" => %peer_id, + "msg" => "Failed to return all requested blocks", + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blobs_sent + ); + } else { + debug!( + self.log, + "BlobsByRange Response processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blobs_sent + ); + } + + if send_response { + // send the stream terminator + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::BlobsByRange(None), + id: request_id, + }); + } + + drop(send_on_drop); } }