handle blobs by range requests

This commit is contained in:
realbigsean 2022-11-30 10:02:29 -05:00
parent 422d145902
commit fc9d0a512d
No known key found for this signature in database
GPG Key ID: B372B64D866BF8CC

View File

@ -480,10 +480,10 @@ impl<T: BeaconChainTypes> Worker<T> {
/// Handle a `BlobsByRange` request from the peer. /// Handle a `BlobsByRange` request from the peer.
pub fn handle_blobs_by_range_request( pub fn handle_blobs_by_range_request(
self, self,
_executor: TaskExecutor, executor: TaskExecutor,
_send_on_drop: SendOnDrop, send_on_drop: SendOnDrop,
peer_id: PeerId, peer_id: PeerId,
_request_id: PeerRequestId, request_id: PeerRequestId,
mut req: BlobsByRangeRequest, mut req: BlobsByRangeRequest,
) { ) {
debug!(self.log, "Received BlobsByRange Request"; debug!(self.log, "Received BlobsByRange Request";
@ -497,10 +497,7 @@ impl<T: BeaconChainTypes> Worker<T> {
req.count = MAX_REQUEST_BLOBS_SIDECARS; req.count = MAX_REQUEST_BLOBS_SIDECARS;
} }
//FIXME(sean) create the blobs iter let forwards_block_root_iter = match self
/*
let forwards_blob_root_iter = match self
.chain .chain
.forwards_iter_block_roots(Slot::from(req.start_slot)) .forwards_iter_block_roots(Slot::from(req.start_slot))
{ {
@ -512,13 +509,12 @@ impl<T: BeaconChainTypes> Worker<T> {
}, },
)) => { )) => {
debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot); debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot);
// return self.send_error_response( return self.send_error_response(
// peer_id, peer_id,
// RPCResponseErrorCode::ResourceUnavailable, RPCResponseErrorCode::ResourceUnavailable,
// "Backfilling".into(), "Backfilling".into(),
// request_id, request_id,
// ); );
todo!("stuff")
} }
Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e),
}; };
@ -548,86 +544,76 @@ impl<T: BeaconChainTypes> Worker<T> {
// remove all skip slots // remove all skip slots
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>(); let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
*/ let mut blobs_sent = 0;
// Fetching blocks is async because it may have to hit the execution layer for payloads. let mut send_response = true;
/*
executor.spawn(
async move {
let mut blocks_sent = 0;
let mut send_response = true;
for root in block_roots { for root in block_roots {
match self.chain.store.get_blobs(&root) { match self.chain.store.get_blobs(&root) {
Ok(Some(blob)) => { Ok(Some(blob)) => {
blocks_sent += 1; blobs_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlobsByRange(Some(Arc::new(blob))),
id: request_id,
});
}
Ok(None) => {
error!(
self.log,
"Blob in the chain is not in the store";
"request_root" => ?root
);
break;
}
Err(e) => {
error!(
self.log,
"Error fetching blob for peer";
"block_root" => ?root,
"error" => ?e
);
break;
}
}
}
let current_slot = self
.chain
.slot()
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
if blobs_sent < (req.count as usize) {
debug!(
self.log,
"BlobsByRange Response processed";
"peer" => %peer_id,
"msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blobs_sent
);
} else {
debug!(
self.log,
"BlobsByRange Response processed";
"peer" => %peer_id,
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blobs_sent
);
}
if send_response {
// send the stream terminator
self.send_network_message(NetworkMessage::SendResponse { self.send_network_message(NetworkMessage::SendResponse {
peer_id, peer_id,
response: Response::BlobsByRange(None), response: Response::BlobsByRange(Some(Arc::new(blob))),
id: request_id, id: request_id,
}); });
} }
Ok(None) => {
error!(
self.log,
"Blob in the chain is not in the store";
"request_root" => ?root
);
break;
}
Err(e) => {
error!(
self.log,
"Error fetching blob for peer";
"block_root" => ?root,
"error" => ?e
);
break;
}
}
}
drop(send_on_drop); let current_slot = self
}, .chain
"load_blocks_by_range_blocks", .slot()
); .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
*/
unimplemented!("") if blobs_sent < (req.count as usize) {
debug!(
self.log,
"BlobsByRange Response processed";
"peer" => %peer_id,
"msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blobs_sent
);
} else {
debug!(
self.log,
"BlobsByRange Response processed";
"peer" => %peer_id,
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blobs_sent
);
}
if send_response {
// send the stream terminator
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlobsByRange(None),
id: request_id,
});
}
drop(send_on_drop);
} }
} }