Add more detail to sync logs

This commit is contained in:
Paul Hauner 2019-03-24 13:59:27 +11:00
parent 840738ea12
commit 15f853416b
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
2 changed files with 85 additions and 42 deletions

View File

@ -8,8 +8,7 @@ use eth2_libp2p::{
PeerId, RPCEvent, PeerId, RPCEvent,
}; };
use futures::future; use futures::future;
use slog::debug; use slog::{debug, warn};
use slog::warn;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
@ -144,14 +143,13 @@ impl MessageHandler {
.remove(&(peer_id.clone(), id)) .remove(&(peer_id.clone(), id))
.is_none() .is_none()
{ {
debug!(self.log, "Unrecognized response from peer: {:?}", peer_id); debug!(self.log, "Unrecognised response from peer: {:?}", peer_id);
return; return;
} }
let response_str = match response { match response {
RPCResponse::Hello(hello_message) => { RPCResponse::Hello(hello_message) => {
self.sync self.sync
.on_hello_response(peer_id, hello_message, &mut self.network_context); .on_hello_response(peer_id, hello_message, &mut self.network_context);
"Hello"
} }
RPCResponse::BeaconBlockRoots(response) => { RPCResponse::BeaconBlockRoots(response) => {
self.sync.on_beacon_block_roots_response( self.sync.on_beacon_block_roots_response(
@ -159,7 +157,6 @@ impl MessageHandler {
response, response,
&mut self.network_context, &mut self.network_context,
); );
"BeaconBlockRoots"
} }
RPCResponse::BeaconBlockHeaders(response) => { RPCResponse::BeaconBlockHeaders(response) => {
self.sync.on_beacon_block_headers_response( self.sync.on_beacon_block_headers_response(
@ -167,7 +164,6 @@ impl MessageHandler {
response, response,
&mut self.network_context, &mut self.network_context,
); );
"BeaconBlockHeaders"
} }
RPCResponse::BeaconBlockBodies(response) => { RPCResponse::BeaconBlockBodies(response) => {
self.sync.on_beacon_block_bodies_response( self.sync.on_beacon_block_bodies_response(
@ -175,13 +171,10 @@ impl MessageHandler {
response, response,
&mut self.network_context, &mut self.network_context,
); );
"BeaconBlockBodies"
} }
// TODO: Handle all responses // TODO: Handle all responses
_ => panic!("Unknown response: {:?}", response), _ => panic!("Unknown response: {:?}", response),
}; };
debug!(self.log, "RPCResponse({})", response_str);
} }
} }

View File

@ -120,6 +120,8 @@ impl SimpleSync {
} }
pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) {
info!(self.log, "PeerConnect"; "peer" => format!("{:?}", peer_id));
network.send_rpc_request(peer_id, RPCRequest::Hello(self.chain.hello_message())); network.send_rpc_request(peer_id, RPCRequest::Hello(self.chain.hello_message()));
} }
@ -129,6 +131,8 @@ impl SimpleSync {
hello: HelloMessage, hello: HelloMessage,
network: &mut NetworkContext, network: &mut NetworkContext,
) { ) {
debug!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id));
// Say hello back. // Say hello back.
network.send_rpc_response( network.send_rpc_response(
peer_id.clone(), peer_id.clone(),
@ -144,6 +148,8 @@ impl SimpleSync {
hello: HelloMessage, hello: HelloMessage,
network: &mut NetworkContext, network: &mut NetworkContext,
) { ) {
debug!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id));
// Process the hello message, without sending back another hello. // Process the hello message, without sending back another hello.
self.process_hello(peer_id, hello, network); self.process_hello(peer_id, hello, network);
} }
@ -210,12 +216,19 @@ impl SimpleSync {
pub fn on_beacon_block_roots_request( pub fn on_beacon_block_roots_request(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request: BeaconBlockRootsRequest, req: BeaconBlockRootsRequest,
network: &mut NetworkContext, network: &mut NetworkContext,
) { ) {
debug!(
self.log,
"BlockRootsRequest";
"peer" => format!("{:?}", peer_id),
"count" => req.count,
);
let roots = match self let roots = match self
.chain .chain
.get_block_roots(request.start_slot, request.count as usize, 0) .get_block_roots(req.start_slot, req.count as usize, 0)
{ {
Ok(roots) => roots, Ok(roots) => roots,
Err(e) => { Err(e) => {
@ -223,7 +236,7 @@ impl SimpleSync {
warn!( warn!(
self.log, self.log,
"RPCRequest"; "peer" => format!("{:?}", peer_id), "RPCRequest"; "peer" => format!("{:?}", peer_id),
"request" => "BeaconBlockRoots", "req" => "BeaconBlockRoots",
"error" => format!("{:?}", e) "error" => format!("{:?}", e)
); );
return; return;
@ -234,7 +247,7 @@ impl SimpleSync {
.iter() .iter()
.enumerate() .enumerate()
.map(|(i, &block_root)| BlockRootSlot { .map(|(i, &block_root)| BlockRootSlot {
slot: request.start_slot + Slot::from(i), slot: req.start_slot + Slot::from(i),
block_root, block_root,
}) })
.collect(); .collect();
@ -248,10 +261,17 @@ impl SimpleSync {
pub fn on_beacon_block_roots_response( pub fn on_beacon_block_roots_response(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
response: BeaconBlockRootsResponse, res: BeaconBlockRootsResponse,
network: &mut NetworkContext, network: &mut NetworkContext,
) { ) {
if response.roots.is_empty() { debug!(
self.log,
"BlockRootsResponse";
"peer" => format!("{:?}", peer_id),
"count" => res.roots.len(),
);
if res.roots.is_empty() {
warn!( warn!(
self.log, self.log,
"Peer returned empty block roots response. PeerId: {:?}", peer_id "Peer returned empty block roots response. PeerId: {:?}", peer_id
@ -259,21 +279,21 @@ impl SimpleSync {
return; return;
} }
let new_root_index = self.import_queue.first_new_root(&response.roots); let new_root_index = self.import_queue.first_new_root(&res.roots);
// If a new block root is found, request it and all the headers following it. // If a new block root is found, request it and all the headers following it.
// //
// We make an assumption here that if we don't know a block then we don't know of all // We make an assumption here that if we don't know a block then we don't know of all
// it's parents. This might not be the case if syncing becomes more sophisticated. // it's parents. This might not be the case if syncing becomes more sophisticated.
if let Some(i) = new_root_index { if let Some(i) = new_root_index {
let new = &response.roots[i]; let new = &res.roots[i];
self.request_block_headers( self.request_block_headers(
peer_id, peer_id,
BeaconBlockHeadersRequest { BeaconBlockHeadersRequest {
start_root: new.block_root, start_root: new.block_root,
start_slot: new.slot, start_slot: new.slot,
max_headers: (response.roots.len() - i) as u64, max_headers: (res.roots.len() - i) as u64,
skip_slots: 0, skip_slots: 0,
}, },
network, network,
@ -284,13 +304,20 @@ impl SimpleSync {
pub fn on_beacon_block_headers_request( pub fn on_beacon_block_headers_request(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request: BeaconBlockHeadersRequest, req: BeaconBlockHeadersRequest,
network: &mut NetworkContext, network: &mut NetworkContext,
) { ) {
debug!(
self.log,
"BlockHeadersRequest";
"peer" => format!("{:?}", peer_id),
"count" => req.max_headers,
);
let headers = match self.chain.get_block_headers( let headers = match self.chain.get_block_headers(
request.start_slot, req.start_slot,
request.max_headers as usize, req.max_headers as usize,
request.skip_slots as usize, req.skip_slots as usize,
) { ) {
Ok(headers) => headers, Ok(headers) => headers,
Err(e) => { Err(e) => {
@ -298,7 +325,7 @@ impl SimpleSync {
warn!( warn!(
self.log, self.log,
"RPCRequest"; "peer" => format!("{:?}", peer_id), "RPCRequest"; "peer" => format!("{:?}", peer_id),
"request" => "BeaconBlockHeaders", "req" => "BeaconBlockHeaders",
"error" => format!("{:?}", e) "error" => format!("{:?}", e)
); );
return; return;
@ -314,10 +341,17 @@ impl SimpleSync {
pub fn on_beacon_block_headers_response( pub fn on_beacon_block_headers_response(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
response: BeaconBlockHeadersResponse, res: BeaconBlockHeadersResponse,
network: &mut NetworkContext, network: &mut NetworkContext,
) { ) {
if response.headers.is_empty() { debug!(
self.log,
"BlockHeadersResponse";
"peer" => format!("{:?}", peer_id),
"count" => res.headers.len(),
);
if res.headers.is_empty() {
warn!( warn!(
self.log, self.log,
"Peer returned empty block headers response. PeerId: {:?}", peer_id "Peer returned empty block headers response. PeerId: {:?}", peer_id
@ -325,9 +359,11 @@ impl SimpleSync {
return; return;
} }
// Enqueue the headers, obtaining a list of the roots of the headers which were newly added
// to the queue.
let block_roots = self let block_roots = self
.import_queue .import_queue
.enqueue_headers(response.headers, peer_id.clone()); .enqueue_headers(res.headers, peer_id.clone());
self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network);
} }
@ -335,17 +371,24 @@ impl SimpleSync {
pub fn on_beacon_block_bodies_request( pub fn on_beacon_block_bodies_request(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request: BeaconBlockBodiesRequest, req: BeaconBlockBodiesRequest,
network: &mut NetworkContext, network: &mut NetworkContext,
) { ) {
let block_bodies = match self.chain.get_block_bodies(&request.block_roots) { debug!(
self.log,
"BlockBodiesRequest";
"peer" => format!("{:?}", peer_id),
"count" => req.block_roots.len(),
);
let block_bodies = match self.chain.get_block_bodies(&req.block_roots) {
Ok(bodies) => bodies, Ok(bodies) => bodies,
Err(e) => { Err(e) => {
// TODO: return RPC error. // TODO: return RPC error.
warn!( warn!(
self.log, self.log,
"RPCRequest"; "peer" => format!("{:?}", peer_id), "RPCRequest"; "peer" => format!("{:?}", peer_id),
"request" => "BeaconBlockBodies", "req" => "BeaconBlockBodies",
"error" => format!("{:?}", e) "error" => format!("{:?}", e)
); );
return; return;
@ -361,11 +404,18 @@ impl SimpleSync {
pub fn on_beacon_block_bodies_response( pub fn on_beacon_block_bodies_response(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
response: BeaconBlockBodiesResponse, res: BeaconBlockBodiesResponse,
network: &mut NetworkContext, network: &mut NetworkContext,
) { ) {
debug!(
self.log,
"BlockBodiesResponse";
"peer" => format!("{:?}", peer_id),
"count" => res.block_bodies.len(),
);
self.import_queue self.import_queue
.enqueue_bodies(response.block_bodies, peer_id.clone()); .enqueue_bodies(res.block_bodies, peer_id.clone());
// Clear out old entries // Clear out old entries
self.import_queue.remove_stale(); self.import_queue.remove_stale();
@ -427,11 +477,11 @@ impl SimpleSync {
fn request_block_roots( fn request_block_roots(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request: BeaconBlockRootsRequest, req: BeaconBlockRootsRequest,
network: &mut NetworkContext, network: &mut NetworkContext,
) { ) {
// Potentially set state to sync. // Potentially set state to sync.
if self.state == SyncState::Idle && request.count > SLOT_IMPORT_TOLERANCE { if self.state == SyncState::Idle && req.count > SLOT_IMPORT_TOLERANCE {
debug!(self.log, "Entering downloading sync state."); debug!(self.log, "Entering downloading sync state.");
self.state = SyncState::Downloading; self.state = SyncState::Downloading;
} }
@ -439,44 +489,44 @@ impl SimpleSync {
debug!( debug!(
self.log, self.log,
"RPCRequest(BeaconBlockRoots)"; "RPCRequest(BeaconBlockRoots)";
"count" => request.count, "count" => req.count,
"peer" => format!("{:?}", peer_id) "peer" => format!("{:?}", peer_id)
); );
// TODO: handle count > max count. // TODO: handle count > max count.
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockRoots(request)); network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockRoots(req));
} }
fn request_block_headers( fn request_block_headers(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request: BeaconBlockHeadersRequest, req: BeaconBlockHeadersRequest,
network: &mut NetworkContext, network: &mut NetworkContext,
) { ) {
debug!( debug!(
self.log, self.log,
"RPCRequest(BeaconBlockHeaders)"; "RPCRequest(BeaconBlockHeaders)";
"max_headers" => request.max_headers, "max_headers" => req.max_headers,
"peer" => format!("{:?}", peer_id) "peer" => format!("{:?}", peer_id)
); );
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockHeaders(request)); network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockHeaders(req));
} }
fn request_block_bodies( fn request_block_bodies(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request: BeaconBlockBodiesRequest, req: BeaconBlockBodiesRequest,
network: &mut NetworkContext, network: &mut NetworkContext,
) { ) {
debug!( debug!(
self.log, self.log,
"RPCRequest(BeaconBlockBodies)"; "RPCRequest(BeaconBlockBodies)";
"count" => request.block_roots.len(), "count" => req.block_roots.len(),
"peer" => format!("{:?}", peer_id) "peer" => format!("{:?}", peer_id)
); );
network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(request)); network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req));
} }
/// Generates our current state in the form of a HELLO RPC message. /// Generates our current state in the form of a HELLO RPC message.