diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 24fe04950..77a2ab8db 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -8,8 +8,7 @@ use eth2_libp2p::{ PeerId, RPCEvent, }; use futures::future; -use slog::debug; -use slog::warn; +use slog::{debug, warn}; use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; @@ -144,14 +143,13 @@ impl MessageHandler { .remove(&(peer_id.clone(), id)) .is_none() { - debug!(self.log, "Unrecognized response from peer: {:?}", peer_id); + debug!(self.log, "Unrecognised response from peer: {:?}", peer_id); return; } - let response_str = match response { + match response { RPCResponse::Hello(hello_message) => { self.sync .on_hello_response(peer_id, hello_message, &mut self.network_context); - "Hello" } RPCResponse::BeaconBlockRoots(response) => { self.sync.on_beacon_block_roots_response( @@ -159,7 +157,6 @@ impl MessageHandler { response, &mut self.network_context, ); - "BeaconBlockRoots" } RPCResponse::BeaconBlockHeaders(response) => { self.sync.on_beacon_block_headers_response( @@ -167,7 +164,6 @@ impl MessageHandler { response, &mut self.network_context, ); - "BeaconBlockHeaders" } RPCResponse::BeaconBlockBodies(response) => { self.sync.on_beacon_block_bodies_response( @@ -175,13 +171,10 @@ impl MessageHandler { response, &mut self.network_context, ); - "BeaconBlockBodies" } // TODO: Handle all responses _ => panic!("Unknown response: {:?}", response), }; - - debug!(self.log, "RPCResponse({})", response_str); } } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index ff6092be1..7609f5750 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -120,6 +120,8 @@ impl SimpleSync { } pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { + info!(self.log, "PeerConnect"; "peer" => format!("{:?}", peer_id)); + network.send_rpc_request(peer_id, RPCRequest::Hello(self.chain.hello_message())); } @@ -129,6 +131,8 @@ impl SimpleSync { hello: HelloMessage, network: &mut NetworkContext, ) { + debug!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); + // Say hello back. network.send_rpc_response( peer_id.clone(), @@ -144,6 +148,8 @@ impl SimpleSync { hello: HelloMessage, network: &mut NetworkContext, ) { + debug!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id)); + // Process the hello message, without sending back another hello. self.process_hello(peer_id, hello, network); } @@ -210,12 +216,19 @@ impl SimpleSync { pub fn on_beacon_block_roots_request( &mut self, peer_id: PeerId, - request: BeaconBlockRootsRequest, + req: BeaconBlockRootsRequest, network: &mut NetworkContext, ) { + debug!( + self.log, + "BlockRootsRequest"; + "peer" => format!("{:?}", peer_id), + "count" => req.count, + ); + let roots = match self .chain - .get_block_roots(request.start_slot, request.count as usize, 0) + .get_block_roots(req.start_slot, req.count as usize, 0) { Ok(roots) => roots, Err(e) => { @@ -223,7 +236,7 @@ impl SimpleSync { warn!( self.log, "RPCRequest"; "peer" => format!("{:?}", peer_id), - "request" => "BeaconBlockRoots", + "req" => "BeaconBlockRoots", "error" => format!("{:?}", e) ); return; @@ -234,7 +247,7 @@ impl SimpleSync { .iter() .enumerate() .map(|(i, &block_root)| BlockRootSlot { - slot: request.start_slot + Slot::from(i), + slot: req.start_slot + Slot::from(i), block_root, }) .collect(); @@ -248,10 +261,17 @@ impl SimpleSync { pub fn on_beacon_block_roots_response( &mut self, peer_id: PeerId, - response: BeaconBlockRootsResponse, + res: BeaconBlockRootsResponse, network: &mut NetworkContext, ) { - if response.roots.is_empty() { + debug!( + self.log, + "BlockRootsResponse"; + "peer" => format!("{:?}", peer_id), + "count" => res.roots.len(), + ); + + if res.roots.is_empty() { warn!( self.log, "Peer returned empty block roots response. PeerId: {:?}", peer_id @@ -259,21 +279,21 @@ impl SimpleSync { return; } - let new_root_index = self.import_queue.first_new_root(&response.roots); + let new_root_index = self.import_queue.first_new_root(&res.roots); // If a new block root is found, request it and all the headers following it. // // We make an assumption here that if we don't know a block then we don't know of all // it's parents. This might not be the case if syncing becomes more sophisticated. if let Some(i) = new_root_index { - let new = &response.roots[i]; + let new = &res.roots[i]; self.request_block_headers( peer_id, BeaconBlockHeadersRequest { start_root: new.block_root, start_slot: new.slot, - max_headers: (response.roots.len() - i) as u64, + max_headers: (res.roots.len() - i) as u64, skip_slots: 0, }, network, @@ -284,13 +304,20 @@ impl SimpleSync { pub fn on_beacon_block_headers_request( &mut self, peer_id: PeerId, - request: BeaconBlockHeadersRequest, + req: BeaconBlockHeadersRequest, network: &mut NetworkContext, ) { + debug!( + self.log, + "BlockHeadersRequest"; + "peer" => format!("{:?}", peer_id), + "count" => req.max_headers, + ); + let headers = match self.chain.get_block_headers( - request.start_slot, - request.max_headers as usize, - request.skip_slots as usize, + req.start_slot, + req.max_headers as usize, + req.skip_slots as usize, ) { Ok(headers) => headers, Err(e) => { @@ -298,7 +325,7 @@ impl SimpleSync { warn!( self.log, "RPCRequest"; "peer" => format!("{:?}", peer_id), - "request" => "BeaconBlockHeaders", + "req" => "BeaconBlockHeaders", "error" => format!("{:?}", e) ); return; @@ -314,10 +341,17 @@ impl SimpleSync { pub fn on_beacon_block_headers_response( &mut self, peer_id: PeerId, - response: BeaconBlockHeadersResponse, + res: BeaconBlockHeadersResponse, network: &mut NetworkContext, ) { - if response.headers.is_empty() { + debug!( + self.log, + "BlockHeadersResponse"; + "peer" => format!("{:?}", peer_id), + "count" => res.headers.len(), + ); + + if res.headers.is_empty() { warn!( self.log, "Peer returned empty block headers response. PeerId: {:?}", peer_id @@ -325,9 +359,11 @@ impl SimpleSync { return; } + // Enqueue the headers, obtaining a list of the roots of the headers which were newly added + // to the queue. let block_roots = self .import_queue - .enqueue_headers(response.headers, peer_id.clone()); + .enqueue_headers(res.headers, peer_id.clone()); self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); } @@ -335,17 +371,24 @@ impl SimpleSync { pub fn on_beacon_block_bodies_request( &mut self, peer_id: PeerId, - request: BeaconBlockBodiesRequest, + req: BeaconBlockBodiesRequest, network: &mut NetworkContext, ) { - let block_bodies = match self.chain.get_block_bodies(&request.block_roots) { + debug!( + self.log, + "BlockBodiesRequest"; + "peer" => format!("{:?}", peer_id), + "count" => req.block_roots.len(), + ); + + let block_bodies = match self.chain.get_block_bodies(&req.block_roots) { Ok(bodies) => bodies, Err(e) => { // TODO: return RPC error. warn!( self.log, "RPCRequest"; "peer" => format!("{:?}", peer_id), - "request" => "BeaconBlockBodies", + "req" => "BeaconBlockBodies", "error" => format!("{:?}", e) ); return; @@ -361,11 +404,18 @@ impl SimpleSync { pub fn on_beacon_block_bodies_response( &mut self, peer_id: PeerId, - response: BeaconBlockBodiesResponse, + res: BeaconBlockBodiesResponse, network: &mut NetworkContext, ) { + debug!( + self.log, + "BlockBodiesResponse"; + "peer" => format!("{:?}", peer_id), + "count" => res.block_bodies.len(), + ); + self.import_queue - .enqueue_bodies(response.block_bodies, peer_id.clone()); + .enqueue_bodies(res.block_bodies, peer_id.clone()); // Clear out old entries self.import_queue.remove_stale(); @@ -427,11 +477,11 @@ impl SimpleSync { fn request_block_roots( &mut self, peer_id: PeerId, - request: BeaconBlockRootsRequest, + req: BeaconBlockRootsRequest, network: &mut NetworkContext, ) { // Potentially set state to sync. - if self.state == SyncState::Idle && request.count > SLOT_IMPORT_TOLERANCE { + if self.state == SyncState::Idle && req.count > SLOT_IMPORT_TOLERANCE { debug!(self.log, "Entering downloading sync state."); self.state = SyncState::Downloading; } @@ -439,44 +489,44 @@ impl SimpleSync { debug!( self.log, "RPCRequest(BeaconBlockRoots)"; - "count" => request.count, + "count" => req.count, "peer" => format!("{:?}", peer_id) ); // TODO: handle count > max count. - network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockRoots(request)); + network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockRoots(req)); } fn request_block_headers( &mut self, peer_id: PeerId, - request: BeaconBlockHeadersRequest, + req: BeaconBlockHeadersRequest, network: &mut NetworkContext, ) { debug!( self.log, "RPCRequest(BeaconBlockHeaders)"; - "max_headers" => request.max_headers, + "max_headers" => req.max_headers, "peer" => format!("{:?}", peer_id) ); - network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockHeaders(request)); + network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockHeaders(req)); } fn request_block_bodies( &mut self, peer_id: PeerId, - request: BeaconBlockBodiesRequest, + req: BeaconBlockBodiesRequest, network: &mut NetworkContext, ) { debug!( self.log, "RPCRequest(BeaconBlockBodies)"; - "count" => request.block_roots.len(), + "count" => req.block_roots.len(), "peer" => format!("{:?}", peer_id) ); - network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(request)); + network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req)); } /// Generates our current state in the form of a HELLO RPC message.