From f43532d3deba899e057d77550b44a4633c474cc1 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Sat, 17 Sep 2022 20:05:51 +0200 Subject: [PATCH] implement handle blobs by range req --- .../beacon_processor/worker/rpc_methods.rs | 140 +++++++++++++++++- 1 file changed, 137 insertions(+), 3 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 2b8cbbc22..97cab1782 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -12,7 +12,7 @@ use slog::{debug, error}; use slot_clock::SlotClock; use std::sync::Arc; use task_executor::TaskExecutor; -use types::{Epoch, EthSpec, Hash256, Slot}; +use types::{Epoch, EthSpec, Hash256, Slot, VariableList}; use super::Worker; @@ -381,8 +381,142 @@ impl Worker { send_on_drop: SendOnDrop, peer_id: PeerId, request_id: PeerRequestId, - request: BlobsByRangeRequest, + mut req: BlobsByRangeRequest, ) { - // TODO impl + debug!(self.log, "Received BlobsByRange Request"; + "peer_id" => %peer_id, + "count" => req.count, + "start_slot" => req.start_slot, + ); + + // Should not send more than max request blocks + if req.count > MAX_REQUEST_BLOCKS { + req.count = MAX_REQUEST_BLOCKS; + } + + let forwards_block_root_iter = match self + .chain + .forwards_iter_block_roots(Slot::from(req.start_slot)) + { + Ok(iter) => iter, + Err(BeaconChainError::HistoricalBlockError( + HistoricalBlockError::BlockOutOfRange { + slot, + oldest_block_slot, + }, + )) => { + debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot); + return self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Backfilling".into(), + request_id, + ); + } + Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), + }; + + // Pick out the required blocks, ignoring skip-slots. + let mut last_block_root = None; + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .collect::>>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e), + }; + + // remove all skip slots + let block_roots = block_roots.into_iter().flatten().collect::>(); + + // Fetching blocks is async because it may have to hit the execution layer for payloads. + executor.spawn( + async move { + let mut blocks_sent = 0; + let mut send_response = true; + + for root in block_roots { + match self.chain.store.get_blobs(&root) { + Ok(Some(blob)) => { + blocks_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::BlobsByRange(Some(Arc::new(VariableList::new(vec![blob.message]).unwrap()))), + id: request_id, + }); + } + Ok(None) => { + error!( + self.log, + "Block in the chain is not in the store"; + "request_root" => ?root + ); + break; + } + Err(e) => { + error!( + self.log, + "Error fetching block for peer"; + "block_root" => ?root, + "error" => ?e + ); + break; + } + } + } + + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + if blocks_sent < (req.count as usize) { + debug!( + self.log, + "BlocksByRange Response processed"; + "peer" => %peer_id, + "msg" => "Failed to return all requested blocks", + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent + ); + } else { + debug!( + self.log, + "BlocksByRange Response processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent + ); + } + + if send_response { + // send the stream terminator + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::BlocksByRange(None), + id: request_id, + }); + } + + drop(send_on_drop); + }, + "load_blocks_by_range_blocks", + ); } }