From 49c4630045039aebe695c523b591e8c816c7267a Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 16 Nov 2020 07:28:30 +0000 Subject: [PATCH] Performance improvement for db reads (#1909) This PR adds a number of improvements: - Downgrade a warning log when we ignore blocks for gossipsub processing - Revert a a correction to improve logging of peer score changes - Shift syncing DB reads off the core-executor allowing parallel processing of large sync messages - Correct the timeout logic of RPC chunk sends, giving more time before timing out RPC outbound messages. --- .../eth2_libp2p/src/peer_manager/mod.rs | 2 +- beacon_node/eth2_libp2p/src/rpc/handler.rs | 10 + .../network/src/beacon_processor/worker.rs | 2 +- beacon_node/network/src/router/processor.rs | 310 ++++++++++-------- 4 files changed, 178 insertions(+), 146 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 11dc37315..fc95b6bd9 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -191,7 +191,7 @@ impl PeerManager { &mut self.events, &self.log, ); - if previous_state != info.score_state() { + if previous_state == info.score_state() { debug!(self.log, "Peer score adjusted"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string()); } } diff --git a/beacon_node/eth2_libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs index 81641ff72..7713aaf52 100644 --- a/beacon_node/eth2_libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2_libp2p/src/rpc/handler.rs @@ -629,6 +629,7 @@ where // if we can't close right now, put the substream back and try again later Poll::Pending => info.state = InboundState::Idle(substream), Poll::Ready(res) => { + // The substream closed, we remove it substreams_to_remove.push(*id); if let Some(ref delay_key) = info.delay_key { self.inbound_substreams_delay.remove(delay_key); @@ -671,6 +672,15 @@ where if let Some(ref delay_key) = info.delay_key { self.inbound_substreams_delay.remove(delay_key); } + } else { + // If we are not removing this substream, we reset the timer. + // Each chunk is allowed RESPONSE_TIMEOUT to be sent. + if let Some(ref delay_key) = info.delay_key { + self.inbound_substreams_delay.reset( + delay_key, + Duration::from_secs(RESPONSE_TIMEOUT), + ); + } } // The stream may be currently idle. Attempt to process more diff --git a/beacon_node/network/src/beacon_processor/worker.rs b/beacon_node/network/src/beacon_processor/worker.rs index 44f96372f..9d5497514 100644 --- a/beacon_node/network/src/beacon_processor/worker.rs +++ b/beacon_node/network/src/beacon_processor/worker.rs @@ -215,7 +215,7 @@ impl Worker { | Err(e @ BlockError::RepeatProposal { .. }) | Err(e @ BlockError::NotFinalizedDescendant { .. }) | Err(e @ BlockError::BeaconChainError(_)) => { - warn!(self.log, "Could not verify block for gossip, ignoring the block"; + debug!(self.log, "Could not verify block for gossip, ignoring the block"; "error" => e.to_string()); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index a0578a4b3..4bc97d041 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -34,6 +34,8 @@ pub struct Processor { network: HandlerNetworkContext, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. beacon_processor_send: mpsc::Sender>, + /// The current task executor. + executor: task_executor::TaskExecutor, /// The `RPCHandler` logger. log: slog::Logger, } @@ -66,7 +68,7 @@ impl Processor { network_tx: network_send.clone(), sync_tx: sync_send.clone(), network_globals, - executor, + executor: executor.clone(), max_workers: cmp::max(1, num_cpus::get()), current_workers: 0, log: log.clone(), @@ -78,6 +80,7 @@ impl Processor { sync_send, network: HandlerNetworkContext::new(network_send, log.clone()), beacon_processor_send, + executor, log: log.new(o!("service" => "router")), } } @@ -219,40 +222,49 @@ impl Processor { /// Handle a `BlocksByRoot` request from the peer. pub fn on_blocks_by_root_request( - &mut self, + &self, peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRootRequest, ) { - let mut send_block_count = 0; - for root in request.block_roots.iter() { - if let Ok(Some(block)) = self.chain.store.get_block(root) { - self.network.send_response( - peer_id.clone(), - Response::BlocksByRoot(Some(Box::new(block))), - request_id, - ); - send_block_count += 1; - } else { - debug!( - self.log, - "Peer requested unknown block"; - "peer" => peer_id.to_string(), - "request_root" => format!("{:}", root), - ); - } - } - debug!( - self.log, - "Received BlocksByRoot Request"; - "peer" => peer_id.to_string(), - "requested" => request.block_roots.len(), - "returned" => send_block_count, - ); + let chain = self.chain.clone(); + let mut network = self.network.clone(); + let log = self.log.clone(); - // send stream termination - self.network - .send_response(peer_id, Response::BlocksByRoot(None), request_id); + // Shift the db reads to a blocking thread. + self.executor.spawn_blocking( + move || { + let mut send_block_count = 0; + for root in request.block_roots.iter() { + if let Ok(Some(block)) = chain.store.get_block(root) { + network.send_response( + peer_id.clone(), + Response::BlocksByRoot(Some(Box::new(block))), + request_id, + ); + send_block_count += 1; + } else { + debug!( + log, + "Peer requested unknown block"; + "peer" => peer_id.to_string(), + "request_root" => format!("{:}", root), + ); + } + } + debug!( + log, + "Received BlocksByRoot Request"; + "peer" => peer_id.to_string(), + "requested" => request.block_roots.len(), + "returned" => send_block_count, + ); + + // send stream termination + network.send_response(peer_id, Response::BlocksByRoot(None), request_id); + }, + "blocks_by_root_request", + ); } /// Handle a `BlocksByRange` request from the peer. @@ -262,133 +274,142 @@ impl Processor { request_id: PeerRequestId, mut req: BlocksByRangeRequest, ) { - debug!( - self.log, - "Received BlocksByRange Request"; - "peer_id" => %peer_id, - "count" => req.count, - "start_slot" => req.start_slot, - "step" => req.step, - ); + let chain = self.chain.clone(); + let mut network = self.network.clone(); + let log = self.log.clone(); - // Should not send more than max request blocks - if req.count > MAX_REQUEST_BLOCKS { - req.count = MAX_REQUEST_BLOCKS; - } - if req.step == 0 { - warn!(self.log, - "Peer sent invalid range request"; - "error" => "Step sent was 0"); - self.network.goodbye_peer(peer_id, GoodbyeReason::Fault); - return; - } + // Shift the db reads to a blocking thread. + self.executor.spawn_blocking(move || { - let forwards_block_root_iter = match self - .chain - .forwards_iter_block_roots(Slot::from(req.start_slot)) - { - Ok(iter) => iter, - Err(e) => { - return error!( - self.log, - "Unable to obtain root iter"; - "error" => format!("{:?}", e) - ) + debug!( + log, + "Received BlocksByRange Request"; + "peer_id" => %peer_id, + "count" => req.count, + "start_slot" => req.start_slot, + "step" => req.step, + ); + + // Should not send more than max request blocks + if req.count > MAX_REQUEST_BLOCKS { + req.count = MAX_REQUEST_BLOCKS; } - }; - - // Pick out the required blocks, ignoring skip-slots and stepping by the step parameter. - // - // NOTE: We don't mind if req.count * req.step overflows as it just ends the iterator early and - // the peer will get less blocks. - // The step parameter is quadratically weighted in the filter, so large values should be - // prevented before reaching this point. - let mut last_block_root = None; - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| { - slot.as_u64() < req.start_slot.saturating_add(req.count * req.step) - }) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .step_by(req.step as usize) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(self.log, "Error during iteration over blocks"; "error" => format!("{:?}", e)); + if req.step == 0 { + warn!(log, + "Peer sent invalid range request"; + "error" => "Step sent was 0"); + network.goodbye_peer(peer_id, GoodbyeReason::Fault); return; } - }; - // remove all skip slots - let block_roots = block_roots - .into_iter() - .filter_map(|root| root) - .collect::>(); + let forwards_block_root_iter = match + chain + .forwards_iter_block_roots(Slot::from(req.start_slot)) + { + Ok(iter) => iter, + Err(e) => { + return error!( + log, + "Unable to obtain root iter"; + "error" => format!("{:?}", e) + ) + } + }; - let mut blocks_sent = 0; - for root in block_roots { - if let Ok(Some(block)) = self.chain.store.get_block(&root) { - // Due to skip slots, blocks could be out of the range, we ensure they are in the - // range before sending - if block.slot() >= req.start_slot - && block.slot() < req.start_slot + req.count * req.step - { - blocks_sent += 1; - self.network.send_response( - peer_id.clone(), - Response::BlocksByRange(Some(Box::new(block))), - request_id, + // Pick out the required blocks, ignoring skip-slots and stepping by the step parameter. + // + // NOTE: We don't mind if req.count * req.step overflows as it just ends the iterator early and + // the peer will get less blocks. + // The step parameter is quadratically weighted in the filter, so large values should be + // prevented before reaching this point. + let mut last_block_root = None; + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| { + slot.as_u64() < req.start_slot.saturating_add(req.count * req.step) + }) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .step_by(req.step as usize) + .collect::>>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => { + error!(log, "Error during iteration over blocks"; "error" => format!("{:?}", e)); + return; + } + }; + + // remove all skip slots + let block_roots = block_roots + .into_iter() + .filter_map(|root| root) + .collect::>(); + + let mut blocks_sent = 0; + for root in block_roots { + if let Ok(Some(block)) = chain.store.get_block(&root) { + // Due to skip slots, blocks could be out of the range, we ensure they are in the + // range before sending + if block.slot() >= req.start_slot + && block.slot() < req.start_slot + req.count * req.step + { + blocks_sent += 1; + network.send_response( + peer_id.clone(), + Response::BlocksByRange(Some(Box::new(block))), + request_id, + ); + } + } else { + error!( + log, + "Block in the chain is not in the store"; + "request_root" => format!("{:}", root), ); } - } else { - error!( - self.log, - "Block in the chain is not in the store"; - "request_root" => format!("{:}", root), - ); } - } - let current_slot = self - .chain - .slot() - .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + let current_slot = + chain + .slot() + .unwrap_or_else(|_| chain.slot_clock.genesis_slot()); - if blocks_sent < (req.count as usize) { - debug!( - self.log, - "BlocksByRange Response Sent"; - "peer" => peer_id.to_string(), - "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blocks_sent); - } else { - debug!( - self.log, - "Sending BlocksByRange Response"; - "peer" => peer_id.to_string(), - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blocks_sent); - } + if blocks_sent < (req.count as usize) { + debug!( + log, + "BlocksByRange Response Sent"; + "peer" => peer_id.to_string(), + "msg" => "Failed to return all requested blocks", + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent); + } else { + debug!( + log, + "Sending BlocksByRange Response"; + "peer" => peer_id.to_string(), + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent); + } - // send the stream terminator - self.network - .send_response(peer_id, Response::BlocksByRange(None), request_id); + // send the stream terminator + network + .send_response(peer_id, Response::BlocksByRange(None), request_id); + + }, "blocks_by_range_request"); } /// Handle a `BlocksByRange` response from the peer. @@ -605,6 +626,7 @@ pub(crate) fn status_message( /// Wraps a Network Channel to employ various RPC related network functionality for the /// processor. +#[derive(Clone)] pub struct HandlerNetworkContext { /// The network channel to relay messages to the Network service. network_send: mpsc::UnboundedSender>,