Performance improvement for db reads (#1909)

This PR adds a number of improvements:
- Downgrade a warning log when we ignore blocks for gossipsub processing
- Revert a a correction to improve logging of peer score changes
- Shift syncing DB reads off the core-executor allowing parallel processing of large sync messages
- Correct the timeout logic of RPC chunk sends, giving more time before timing out RPC outbound messages.
This commit is contained in:
Age Manning 2020-11-16 07:28:30 +00:00
parent 646c049df2
commit 49c4630045
4 changed files with 178 additions and 146 deletions

View File

@ -191,7 +191,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
&mut self.events,
&self.log,
);
if previous_state != info.score_state() {
if previous_state == info.score_state() {
debug!(self.log, "Peer score adjusted"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string());
}
}

View File

@ -629,6 +629,7 @@ where
// if we can't close right now, put the substream back and try again later
Poll::Pending => info.state = InboundState::Idle(substream),
Poll::Ready(res) => {
// The substream closed, we remove it
substreams_to_remove.push(*id);
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.remove(delay_key);
@ -671,6 +672,15 @@ where
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.remove(delay_key);
}
} else {
// If we are not removing this substream, we reset the timer.
// Each chunk is allowed RESPONSE_TIMEOUT to be sent.
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.reset(
delay_key,
Duration::from_secs(RESPONSE_TIMEOUT),
);
}
}
// The stream may be currently idle. Attempt to process more

View File

@ -215,7 +215,7 @@ impl<T: BeaconChainTypes> Worker<T> {
| Err(e @ BlockError::RepeatProposal { .. })
| Err(e @ BlockError::NotFinalizedDescendant { .. })
| Err(e @ BlockError::BeaconChainError(_)) => {
warn!(self.log, "Could not verify block for gossip, ignoring the block";
debug!(self.log, "Could not verify block for gossip, ignoring the block";
"error" => e.to_string());
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return;

View File

@ -34,6 +34,8 @@ pub struct Processor<T: BeaconChainTypes> {
network: HandlerNetworkContext<T::EthSpec>,
/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
/// The current task executor.
executor: task_executor::TaskExecutor,
/// The `RPCHandler` logger.
log: slog::Logger,
}
@ -66,7 +68,7 @@ impl<T: BeaconChainTypes> Processor<T> {
network_tx: network_send.clone(),
sync_tx: sync_send.clone(),
network_globals,
executor,
executor: executor.clone(),
max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0,
log: log.clone(),
@ -78,6 +80,7 @@ impl<T: BeaconChainTypes> Processor<T> {
sync_send,
network: HandlerNetworkContext::new(network_send, log.clone()),
beacon_processor_send,
executor,
log: log.new(o!("service" => "router")),
}
}
@ -219,40 +222,49 @@ impl<T: BeaconChainTypes> Processor<T> {
/// Handle a `BlocksByRoot` request from the peer.
pub fn on_blocks_by_root_request(
&mut self,
&self,
peer_id: PeerId,
request_id: PeerRequestId,
request: BlocksByRootRequest,
) {
let mut send_block_count = 0;
for root in request.block_roots.iter() {
if let Ok(Some(block)) = self.chain.store.get_block(root) {
self.network.send_response(
peer_id.clone(),
Response::BlocksByRoot(Some(Box::new(block))),
request_id,
);
send_block_count += 1;
} else {
debug!(
self.log,
"Peer requested unknown block";
"peer" => peer_id.to_string(),
"request_root" => format!("{:}", root),
);
}
}
debug!(
self.log,
"Received BlocksByRoot Request";
"peer" => peer_id.to_string(),
"requested" => request.block_roots.len(),
"returned" => send_block_count,
);
let chain = self.chain.clone();
let mut network = self.network.clone();
let log = self.log.clone();
// send stream termination
self.network
.send_response(peer_id, Response::BlocksByRoot(None), request_id);
// Shift the db reads to a blocking thread.
self.executor.spawn_blocking(
move || {
let mut send_block_count = 0;
for root in request.block_roots.iter() {
if let Ok(Some(block)) = chain.store.get_block(root) {
network.send_response(
peer_id.clone(),
Response::BlocksByRoot(Some(Box::new(block))),
request_id,
);
send_block_count += 1;
} else {
debug!(
log,
"Peer requested unknown block";
"peer" => peer_id.to_string(),
"request_root" => format!("{:}", root),
);
}
}
debug!(
log,
"Received BlocksByRoot Request";
"peer" => peer_id.to_string(),
"requested" => request.block_roots.len(),
"returned" => send_block_count,
);
// send stream termination
network.send_response(peer_id, Response::BlocksByRoot(None), request_id);
},
"blocks_by_root_request",
);
}
/// Handle a `BlocksByRange` request from the peer.
@ -262,133 +274,142 @@ impl<T: BeaconChainTypes> Processor<T> {
request_id: PeerRequestId,
mut req: BlocksByRangeRequest,
) {
debug!(
self.log,
"Received BlocksByRange Request";
"peer_id" => %peer_id,
"count" => req.count,
"start_slot" => req.start_slot,
"step" => req.step,
);
let chain = self.chain.clone();
let mut network = self.network.clone();
let log = self.log.clone();
// Should not send more than max request blocks
if req.count > MAX_REQUEST_BLOCKS {
req.count = MAX_REQUEST_BLOCKS;
}
if req.step == 0 {
warn!(self.log,
"Peer sent invalid range request";
"error" => "Step sent was 0");
self.network.goodbye_peer(peer_id, GoodbyeReason::Fault);
return;
}
// Shift the db reads to a blocking thread.
self.executor.spawn_blocking(move || {
let forwards_block_root_iter = match self
.chain
.forwards_iter_block_roots(Slot::from(req.start_slot))
{
Ok(iter) => iter,
Err(e) => {
return error!(
self.log,
"Unable to obtain root iter";
"error" => format!("{:?}", e)
)
debug!(
log,
"Received BlocksByRange Request";
"peer_id" => %peer_id,
"count" => req.count,
"start_slot" => req.start_slot,
"step" => req.step,
);
// Should not send more than max request blocks
if req.count > MAX_REQUEST_BLOCKS {
req.count = MAX_REQUEST_BLOCKS;
}
};
// Pick out the required blocks, ignoring skip-slots and stepping by the step parameter.
//
// NOTE: We don't mind if req.count * req.step overflows as it just ends the iterator early and
// the peer will get less blocks.
// The step parameter is quadratically weighted in the filter, so large values should be
// prevented before reaching this point.
let mut last_block_root = None;
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| {
slot.as_u64() < req.start_slot.saturating_add(req.count * req.step)
})
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.step_by(req.step as usize)
.collect::<Vec<Option<Hash256>>>()
});
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
error!(self.log, "Error during iteration over blocks"; "error" => format!("{:?}", e));
if req.step == 0 {
warn!(log,
"Peer sent invalid range request";
"error" => "Step sent was 0");
network.goodbye_peer(peer_id, GoodbyeReason::Fault);
return;
}
};
// remove all skip slots
let block_roots = block_roots
.into_iter()
.filter_map(|root| root)
.collect::<Vec<_>>();
let forwards_block_root_iter = match
chain
.forwards_iter_block_roots(Slot::from(req.start_slot))
{
Ok(iter) => iter,
Err(e) => {
return error!(
log,
"Unable to obtain root iter";
"error" => format!("{:?}", e)
)
}
};
let mut blocks_sent = 0;
for root in block_roots {
if let Ok(Some(block)) = self.chain.store.get_block(&root) {
// Due to skip slots, blocks could be out of the range, we ensure they are in the
// range before sending
if block.slot() >= req.start_slot
&& block.slot() < req.start_slot + req.count * req.step
{
blocks_sent += 1;
self.network.send_response(
peer_id.clone(),
Response::BlocksByRange(Some(Box::new(block))),
request_id,
// Pick out the required blocks, ignoring skip-slots and stepping by the step parameter.
//
// NOTE: We don't mind if req.count * req.step overflows as it just ends the iterator early and
// the peer will get less blocks.
// The step parameter is quadratically weighted in the filter, so large values should be
// prevented before reaching this point.
let mut last_block_root = None;
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| {
slot.as_u64() < req.start_slot.saturating_add(req.count * req.step)
})
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.step_by(req.step as usize)
.collect::<Vec<Option<Hash256>>>()
});
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
error!(log, "Error during iteration over blocks"; "error" => format!("{:?}", e));
return;
}
};
// remove all skip slots
let block_roots = block_roots
.into_iter()
.filter_map(|root| root)
.collect::<Vec<_>>();
let mut blocks_sent = 0;
for root in block_roots {
if let Ok(Some(block)) = chain.store.get_block(&root) {
// Due to skip slots, blocks could be out of the range, we ensure they are in the
// range before sending
if block.slot() >= req.start_slot
&& block.slot() < req.start_slot + req.count * req.step
{
blocks_sent += 1;
network.send_response(
peer_id.clone(),
Response::BlocksByRange(Some(Box::new(block))),
request_id,
);
}
} else {
error!(
log,
"Block in the chain is not in the store";
"request_root" => format!("{:}", root),
);
}
} else {
error!(
self.log,
"Block in the chain is not in the store";
"request_root" => format!("{:}", root),
);
}
}
let current_slot = self
.chain
.slot()
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
let current_slot =
chain
.slot()
.unwrap_or_else(|_| chain.slot_clock.genesis_slot());
if blocks_sent < (req.count as usize) {
debug!(
self.log,
"BlocksByRange Response Sent";
"peer" => peer_id.to_string(),
"msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blocks_sent);
} else {
debug!(
self.log,
"Sending BlocksByRange Response";
"peer" => peer_id.to_string(),
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blocks_sent);
}
if blocks_sent < (req.count as usize) {
debug!(
log,
"BlocksByRange Response Sent";
"peer" => peer_id.to_string(),
"msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blocks_sent);
} else {
debug!(
log,
"Sending BlocksByRange Response";
"peer" => peer_id.to_string(),
"start_slot" => req.start_slot,
"current_slot" => current_slot,
"requested" => req.count,
"returned" => blocks_sent);
}
// send the stream terminator
self.network
.send_response(peer_id, Response::BlocksByRange(None), request_id);
// send the stream terminator
network
.send_response(peer_id, Response::BlocksByRange(None), request_id);
}, "blocks_by_range_request");
}
/// Handle a `BlocksByRange` response from the peer.
@ -605,6 +626,7 @@ pub(crate) fn status_message<T: BeaconChainTypes>(
/// Wraps a Network Channel to employ various RPC related network functionality for the
/// processor.
#[derive(Clone)]
pub struct HandlerNetworkContext<T: EthSpec> {
/// The network channel to relay messages to the Network service.
network_send: mpsc::UnboundedSender<NetworkMessage<T>>,