implement handle blobs by range req

This commit is contained in:
Marius van der Wijden 2022-09-17 20:05:51 +02:00
parent f9209e2d08
commit f43532d3de

View File

@ -12,7 +12,7 @@ use slog::{debug, error};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use types::{Epoch, EthSpec, Hash256, Slot}; use types::{Epoch, EthSpec, Hash256, Slot, VariableList};
use super::Worker; use super::Worker;
@ -381,8 +381,142 @@ impl<T: BeaconChainTypes> Worker<T> {
send_on_drop: SendOnDrop, send_on_drop: SendOnDrop,
peer_id: PeerId, peer_id: PeerId,
request_id: PeerRequestId, request_id: PeerRequestId,
request: BlobsByRangeRequest, mut req: BlobsByRangeRequest,
) { ) {
// TODO impl debug!(self.log, "Received BlobsByRange Request";
"peer_id" => %peer_id,
"count" => req.count,
"start_slot" => req.start_slot,
);
// Should not send more than max request blocks
if req.count > MAX_REQUEST_BLOCKS {
req.count = MAX_REQUEST_BLOCKS;
}
let forwards_block_root_iter = match self
.chain
.forwards_iter_block_roots(Slot::from(req.start_slot))
{
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockError(
HistoricalBlockError::BlockOutOfRange {
slot,
oldest_block_slot,
},
)) => {
debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot);
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Backfilling".into(),
request_id,
);
}
Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e),
};
// Pick out the required blocks, ignoring skip-slots.
let mut last_block_root = None;
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
});
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e),
};
// remove all skip slots
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
// Fetching blocks is async because it may have to hit the execution layer for payloads.
executor.spawn(
async move {
let mut blocks_sent = 0;
let mut send_response = true;
for root in block_roots {
match self.chain.store.get_blobs(&root) {
Ok(Some(blob)) => {
blocks_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlobsByRange(Some(Arc::new(VariableList::new(vec![blob.message]).unwrap()))),
id: request_id,
});
}
Ok(None) => {
error!(
self.log,
"Block in the chain is not in the store";
"request_root" => ?root
);
break;
}
Err(e) => {
error!(
self.log,
"Error fetching block for peer";
"block_root" => ?root,
"error" => ?e
);
break;
}
}
}
let current_slot = self
.chain
.slot()
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
if blocks_sent < (req.count as usize) {
debug!(
self.log,
"BlocksByRange Response processed";
"peer" => %peer_id,
"msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blocks_sent
);
} else {
debug!(
self.log,
"BlocksByRange Response processed";
"peer" => %peer_id,
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blocks_sent
);
}
if send_response {
// send the stream terminator
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlocksByRange(None),
id: request_id,
});
}
drop(send_on_drop);
},
"load_blocks_by_range_blocks",
);
} }
} }