From 13b5df56b3f4e81238733943e23bb3e11d602c5a Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 3 Sep 2019 07:50:44 +1000 Subject: [PATCH] Account manager, bootnodes, RPC display and sync fixes --- account_manager/src/main.rs | 10 +- beacon_node/eth2-libp2p/src/discovery.rs | 2 +- beacon_node/eth2-libp2p/src/rpc/methods.rs | 48 ++++++++ beacon_node/eth2-libp2p/src/rpc/mod.rs | 10 ++ beacon_node/eth2-libp2p/src/rpc/protocol.rs | 11 ++ beacon_node/eth2-libp2p/src/service.rs | 16 ++- beacon_node/network/src/service.rs | 4 +- beacon_node/network/src/sync/manager.rs | 124 ++++++++++++-------- beacon_node/network/src/sync/simple_sync.rs | 2 +- beacon_node/src/main.rs | 14 --- 10 files changed, 168 insertions(+), 73 deletions(-) diff --git a/account_manager/src/main.rs b/account_manager/src/main.rs index b7448ddf2..ae3823049 100644 --- a/account_manager/src/main.rs +++ b/account_manager/src/main.rs @@ -125,9 +125,13 @@ fn main() { } } } - _ => panic!( - "The account manager must be run with a subcommand. See help for more information." - ), + _ => { + crit!( + log, + "The account manager must be run with a subcommand. See help for more information." + ); + return; + } } } diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index 4a8aba2b1..c3f2522d8 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -114,7 +114,7 @@ impl Discovery { self.find_peers(); } - /// Add an Enr to the routing table of the discovery mechanism. + /// Add an ENR to the routing table of the discovery mechanism. pub fn add_enr(&mut self, enr: Enr) { self.discovery.add_enr(enr); } diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index d912bcfa1..c9610b000 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -157,3 +157,51 @@ impl ErrorMessage { String::from_utf8(self.error_message.clone()).unwrap_or_else(|_| "".into()) } } + +impl std::fmt::Display for HelloMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Hello Message: Fork Version: {:?}, Finalized Root: {}, Finalized Epoch: {}, Head Root: {}, Head Slot: {}", self.fork_version, self.finalized_root, self.finalized_epoch, self.head_root, self.head_slot) + } +} + +impl std::fmt::Display for RPCResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RPCResponse::Hello(hello) => write!(f, "{}", hello), + RPCResponse::BeaconBlocks(_) => write!(f, ""), + RPCResponse::RecentBeaconBlocks(_) => write!(f, ""), + } + } +} + +impl std::fmt::Display for RPCErrorResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RPCErrorResponse::Success(res) => write!(f, "{}", res), + RPCErrorResponse::InvalidRequest(err) => write!(f, "Invalid Request: {:?}", err), + RPCErrorResponse::ServerError(err) => write!(f, "Server Error: {:?}", err), + RPCErrorResponse::Unknown(err) => write!(f, "Unknown Error: {:?}", err), + } + } +} + +impl std::fmt::Display for GoodbyeReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + GoodbyeReason::ClientShutdown => write!(f, "Client Shutdown"), + GoodbyeReason::IrrelevantNetwork => write!(f, "Irrelevant Network"), + GoodbyeReason::Fault => write!(f, "Fault"), + GoodbyeReason::Unknown => write!(f, "Unknown Reason"), + } + } +} + +impl std::fmt::Display for BeaconBlocksRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Head Block Root: {}, Start Slot: {}, Count: {}, Step: {}", + self.head_block_root, self.start_slot, self.count, self.step + ) + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 756a62e71..2076615a9 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -47,6 +47,16 @@ impl RPCEvent { } } +impl std::fmt::Display for RPCEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RPCEvent::Request(id, req) => write!(f, "RPC Request(Id: {}, {})", id, req), + RPCEvent::Response(id, res) => write!(f, "RPC Response(Id: {}, {})", id, res), + RPCEvent::Error(id, err) => write!(f, "RPC Request(Id: {}, Error: {:?})", id, err), + } + } +} + /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. pub struct RPC { diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index be1efdf5d..401fa8b9e 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -288,3 +288,14 @@ impl std::error::Error for RPCError { } } } + +impl std::fmt::Display for RPCRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RPCRequest::Hello(hello) => write!(f, "Hello Message: {}", hello), + RPCRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason), + RPCRequest::BeaconBlocks(req) => write!(f, "Beacon Blocks: {}", req), + RPCRequest::RecentBeaconBlocks(req) => write!(f, "Recent Beacon Blocks: {:?}", req), + } + } +} diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 1ea1723b6..96b5a276e 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -79,8 +79,8 @@ impl Service { } }; - // attempt to connect to user-input libp2p nodes - for multiaddr in config.libp2p_nodes { + // helper closure for dialing peers + let mut dial_addr = |multiaddr: Multiaddr| { match Swarm::dial_addr(&mut swarm, multiaddr.clone()) { Ok(()) => debug!(log, "Dialing libp2p peer"; "address" => format!("{}", multiaddr)), Err(err) => debug!( @@ -88,6 +88,18 @@ impl Service { "Could not connect to peer"; "address" => format!("{}", multiaddr), "error" => format!("{:?}", err) ), }; + }; + + // attempt to connect to user-input libp2p nodes + for multiaddr in config.libp2p_nodes { + dial_addr(multiaddr); + } + + // attempt to connect to any specified boot-nodes + for bootnode_enr in config.boot_nodes { + for multiaddr in bootnode_enr.multiaddr() { + dial_addr(multiaddr); + } } // subscribe to default gossipsub topics diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index a8b3c74b6..ae7562033 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -161,7 +161,7 @@ fn network_service( Ok(Async::Ready(Some(message))) => match message { NetworkMessage::Send(peer_id, outgoing_message) => match outgoing_message { OutgoingMessage::RPC(rpc_event) => { - trace!(log, "Sending RPC Event: {:?}", rpc_event); + trace!(log, "{}", rpc_event); libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event); } }, @@ -185,7 +185,7 @@ fn network_service( match libp2p_service.lock().poll() { Ok(Async::Ready(Some(event))) => match event { Libp2pEvent::RPC(peer_id, rpc_event) => { - trace!(log, "RPC Event: RPC message received: {:?}", rpc_event); + trace!(log, "{}", rpc_event); message_handler_send .try_send(HandlerMessage::RPC(peer_id, rpc_event)) .map_err(|_| "Failed to send RPC to handler")?; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index a48b43ad7..8ba7486a5 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -68,7 +68,7 @@ use types::{BeaconBlock, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch /// is requested. Currently the value is small for testing. This will be incremented for /// production. -const MAX_BLOCKS_PER_REQUEST: u64 = 10; +const MAX_BLOCKS_PER_REQUEST: u64 = 100; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -120,6 +120,8 @@ struct BlockRequests { target_head_root: Hash256, /// The blocks that we have currently downloaded from the peer that are yet to be processed. downloaded_blocks: Vec>, + /// The number of blocks successfully processed in this request. + blocks_processed: usize, /// The number of empty batches we have consecutively received. If a peer returns more than /// EMPTY_BATCHES_TOLERANCE, they are dropped. consecutive_empty_batches: usize, @@ -302,6 +304,7 @@ impl ImportManager { target_head_root: remote.head_root, consecutive_empty_batches: 0, downloaded_blocks: Vec::new(), + blocks_processed: 0, state: BlockRequestsState::Queued, sync_direction: SyncDirection::Initial, current_start_slot: chain.best_slot(), @@ -356,6 +359,10 @@ impl ImportManager { warn!(self.log, "Peer returned too many empty block batches"; "peer" => format!("{:?}", peer_id)); block_requests.state = BlockRequestsState::Failed; + } else if block_requests.current_start_slot >= block_requests.target_head_slot { + warn!(self.log, "Peer did not return blocks it claimed to possess"; + "peer" => format!("{:?}", peer_id)); + block_requests.state = BlockRequestsState::Failed; } else { block_requests.update_start_slot(); } @@ -561,19 +568,19 @@ impl ImportManager { // only process batch requests if there are any if !self.import_queue.is_empty() { // process potential block requests - self.process_potential_block_requests(); + re_run = re_run || self.process_potential_block_requests(); // process any complete long-range batches - re_run = self.process_complete_batches(); + re_run = re_run || self.process_complete_batches(); } // only process parent objects if we are in regular sync - if let ManagerState::Regular = self.state { + if !self.parent_queue.is_empty() { // process any parent block lookup-requests - self.process_parent_requests(); + re_run = re_run || self.process_parent_requests(); // process any complete parent lookups - re_run = self.process_complete_parent_requests(); + re_run = re_run || self.process_complete_parent_requests(); } // return any queued events @@ -613,20 +620,23 @@ impl ImportManager { } } - fn process_potential_block_requests(&mut self) { + fn process_potential_block_requests(&mut self) -> bool { // check if an outbound request is required // Managing a fixed number of outbound requests is maintained at the RPC protocol libp2p // layer and not needed here. Therefore we create many outbound requests and let the RPC // handle the number of simultaneous requests. Request all queued objects. + let mut re_run = false; // remove any failed batches let debug_log = &self.log; + let full_peer_ref = &mut self.full_peers; self.import_queue.retain(|peer_id, block_request| { if let BlockRequestsState::Failed = block_request.state { debug!(debug_log, "Block import from peer failed"; "peer_id" => format!("{:?}", peer_id), - "downloaded_blocks" => block_request.downloaded_blocks.len() + "downloaded_blocks" => block_request.blocks_processed ); + full_peer_ref.remove(peer_id); false } else { true @@ -654,7 +664,10 @@ impl ImportManager { request, request_id, }); + re_run = true; } + + re_run } fn process_complete_batches(&mut self) -> bool { @@ -667,66 +680,75 @@ impl ImportManager { let event_queue_ref = &mut self.event_queue; self.import_queue.retain(|peer_id, block_requests| { - // check that the chain still exists - if let Some(chain) = chain_ref.upgrade() { - let downloaded_blocks = - std::mem::replace(&mut block_requests.downloaded_blocks, Vec::new()); - let last_element = block_requests.downloaded_blocks.len() - 1; - let start_slot = block_requests.downloaded_blocks[0].slot; - let end_slot = block_requests.downloaded_blocks[last_element].slot; + if block_requests.state == BlockRequestsState::ReadyToProcess { + // check that the chain still exists + if let Some(chain) = chain_ref.upgrade() { + let downloaded_blocks = + std::mem::replace(&mut block_requests.downloaded_blocks, Vec::new()); + let last_element = downloaded_blocks.len() - 1; + let start_slot = downloaded_blocks[0].slot; + let end_slot = downloaded_blocks[last_element].slot; - match process_blocks(chain, downloaded_blocks, log_ref) { - Ok(()) => { - debug!(log_ref, "Blocks processed successfully"; - "peer" => format!("{:?}", peer_id), - "start_slot" => start_slot, - "end_slot" => end_slot, - "no_blocks" => last_element + 1, - ); - - // check if the batch is complete, by verifying if we have reached the - // target head - if end_slot >= block_requests.target_head_slot { - // Completed, re-hello the peer to ensure we are up to the latest head - event_queue_ref.push(ImportManagerOutcome::Hello(peer_id.clone())); - // remove the request - false - } else { - // have not reached the end, queue another batch - block_requests.update_start_slot(); - re_run = true; - // keep the batch - true - } - } - Err(e) => { - warn!(log_ref, "Block processing failed"; + match process_blocks(chain, downloaded_blocks, log_ref) { + Ok(()) => { + debug!(log_ref, "Blocks processed successfully"; "peer" => format!("{:?}", peer_id), "start_slot" => start_slot, "end_slot" => end_slot, "no_blocks" => last_element + 1, - "error" => format!("{:?}", e), - ); - event_queue_ref.push(ImportManagerOutcome::DownvotePeer(peer_id.clone())); - false + ); + block_requests.blocks_processed += last_element + 1; + + // check if the batch is complete, by verifying if we have reached the + // target head + if end_slot >= block_requests.target_head_slot { + // Completed, re-hello the peer to ensure we are up to the latest head + event_queue_ref.push(ImportManagerOutcome::Hello(peer_id.clone())); + // remove the request + false + } else { + // have not reached the end, queue another batch + block_requests.update_start_slot(); + re_run = true; + // keep the batch + true + } + } + Err(e) => { + warn!(log_ref, "Block processing failed"; + "peer" => format!("{:?}", peer_id), + "start_slot" => start_slot, + "end_slot" => end_slot, + "no_blocks" => last_element + 1, + "error" => format!("{:?}", e), + ); + event_queue_ref + .push(ImportManagerOutcome::DownvotePeer(peer_id.clone())); + false + } } + } else { + // chain no longer exists, empty the queue and return + event_queue_ref.clear(); + return false; } } else { - // chain no longer exists, empty the queue and return - event_queue_ref.clear(); - return false; + // not ready to process + true } }); re_run } - fn process_parent_requests(&mut self) { + fn process_parent_requests(&mut self) -> bool { // check to make sure there are peers to search for the parent from if self.full_peers.is_empty() { - return; + return false; } + let mut re_run = false; + // remove any failed requests let debug_log = &self.log; self.parent_queue.retain(|parent_request| { @@ -766,8 +788,10 @@ impl ImportManager { self.event_queue .push(ImportManagerOutcome::RecentRequest(peer_id.clone(), req)); + re_run = true; } } + re_run } fn process_complete_parent_requests(&mut self) -> bool { diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 36947082e..e1ca30b0a 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -453,7 +453,7 @@ impl SimpleSync { } BlockProcessingOutcome::ParentUnknown { parent: _ } => { // Inform the sync manager to find parents for this block - trace!(self.log, "Unknown parent gossip"; + trace!(self.log, "Block with unknown parent received"; "peer_id" => format!("{:?}",peer_id)); self.manager.add_unknown_block(block.clone(), peer_id); SHOULD_FORWARD_GOSSIP_BLOCK diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index ea801cd8b..ab9803eba 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -187,13 +187,6 @@ fn main() { .possible_values(&["info", "debug", "trace", "warn", "error", "crit"]) .default_value("trace"), ) - .arg( - Arg::with_name("verbosity") - .short("v") - .multiple(true) - .help("Sets the verbosity level") - .takes_value(true), - ) /* * The "testnet" sub-command. * @@ -332,13 +325,6 @@ fn main() { _ => unreachable!("guarded by clap"), }; - let drain = match matches.occurrences_of("verbosity") { - 0 => drain.filter_level(Level::Info), - 1 => drain.filter_level(Level::Debug), - 2 => drain.filter_level(Level::Trace), - _ => drain.filter_level(Level::Trace), - }; - let log = slog::Logger::root(drain.fuse(), o!()); warn!(