diff --git a/Cargo.lock b/Cargo.lock index f2d0dcce3..f4e171c69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -305,7 +305,6 @@ dependencies = [ "slog-term", "store", "tokio 0.2.21", - "toml", "types", "version", ] diff --git a/Makefile b/Makefile index 7f8ea3052..76da24603 100644 --- a/Makefile +++ b/Makefile @@ -51,9 +51,9 @@ test: test-release test-full: cargo-fmt test-release test-debug test-ef # Lints the code for bad style and potentially unsafe arithmetic using Clippy. -# Clippy lints are opt-in per-crate for now, which is why we allow all by default. +# Clippy lints are opt-in per-crate for now. By default, everything is allowed except for performance and correctness lints. lint: - cargo clippy --all -- -A clippy::all + cargo clippy --all -- -A clippy::all --D clippy::perf --D clippy::correctness # Runs the makefile in the `ef_tests` repo. # diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 4d70b84f3..ca5ad0553 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -37,6 +37,5 @@ genesis = { path = "genesis" } eth2_testnet_config = { path = "../common/eth2_testnet_config" } eth2_libp2p = { path = "./eth2_libp2p" } eth2_ssz = "0.1.2" -toml = "0.5.6" serde = "1.0.110" clap_utils = { path = "../common/clap_utils" } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b00aea6f0..143f07e0d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1229,14 +1229,9 @@ impl BeaconChain { } } - while !filtered_chain_segment.is_empty() { + while let Some((_root, block)) = filtered_chain_segment.first() { // Determine the epoch of the first block in the remaining segment. - let start_epoch = filtered_chain_segment - .first() - .map(|(_root, block)| block) - .expect("chain_segment cannot be empty") - .slot() - .epoch(T::EthSpec::slots_per_epoch()); + let start_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); // The `last_index` indicates the position of the last block that is in the current // epoch of `start_epoch`. diff --git a/beacon_node/beacon_chain/src/observed_attesters.rs b/beacon_node/beacon_chain/src/observed_attesters.rs index bc428fd5d..8cc3f5070 100644 --- a/beacon_node/beacon_chain/src/observed_attesters.rs +++ b/beacon_node/beacon_chain/src/observed_attesters.rs @@ -197,7 +197,7 @@ impl AutoPruningContainer { .map(|(_epoch, item)| item.len()) .fold((0, 0), |(count, sum), len| (count + 1, sum + len)); - let initial_capacity = sum.checked_div(count).unwrap_or(T::default_capacity()); + let initial_capacity = sum.checked_div(count).unwrap_or_else(T::default_capacity); let mut item = T::with_capacity(initial_capacity); item.insert(validator_index); diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index e98c21051..aaf2d6e96 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -504,7 +504,7 @@ where }; let sk = &self.keypairs[proposer_index].sk; - let fork = &state.fork.clone(); + let fork = &state.fork; let randao_reveal = { let epoch = slot.epoch(E::slots_per_epoch()); @@ -676,7 +676,7 @@ where selection_proof.is_aggregator(bc.committee.len(), spec).unwrap_or(false) }) .copied() - .expect(&format!( + .unwrap_or_else(|| panic!( "Committee {} at slot {} with {} attesting validators does not have any aggregators", bc.index, state.slot, bc.committee.len() )); diff --git a/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs b/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs index b20fe5170..2863efd85 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs @@ -70,7 +70,7 @@ pub enum DelegateIn { pub enum DelegateOut { Gossipsub(::OutEvent), RPC( as ProtocolsHandler>::OutEvent), - Identify(::OutEvent), + Identify(Box<::OutEvent>), } /// Wrapper around the `ProtocolsHandler::Error` types of the handlers. @@ -342,7 +342,9 @@ impl ProtocolsHandler for DelegatingHandler { match self.identify_handler.poll(cx) { Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { - return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::Identify(event))); + return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::Identify( + Box::new(event), + ))); } Poll::Ready(ProtocolsHandlerEvent::Close(event)) => { return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Identify(event))); diff --git a/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs index 64d0bcaca..f87ba6dbe 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs @@ -42,7 +42,7 @@ pub enum BehaviourHandlerIn { } pub enum BehaviourHandlerOut { - Delegate(DelegateOut), + Delegate(Box>), // TODO: replace custom with events to send Custom, } @@ -119,7 +119,7 @@ impl ProtocolsHandler for BehaviourHandler { match self.delegate.poll(cx) { Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { return Poll::Ready(ProtocolsHandlerEvent::Custom( - BehaviourHandlerOut::Delegate(event), + BehaviourHandlerOut::Delegate(Box::new(event)), )) } Poll::Ready(ProtocolsHandlerEvent::Close(err)) => { diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 1aca8a5dd..cd54c7f6d 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -160,10 +160,10 @@ impl NetworkBehaviour for Behaviour { ) { match event { // Events comming from the handler, redirected to each behaviour - BehaviourHandlerOut::Delegate(delegate) => match delegate { + BehaviourHandlerOut::Delegate(delegate) => match *delegate { DelegateOut::Gossipsub(ev) => self.gossipsub.inject_event(peer_id, conn_id, ev), DelegateOut::RPC(ev) => self.eth2_rpc.inject_event(peer_id, conn_id, ev), - DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, ev), + DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, *ev), }, /* Custom events sent BY the handler */ BehaviourHandlerOut::Custom => { diff --git a/beacon_node/eth2_libp2p/src/config.rs b/beacon_node/eth2_libp2p/src/config.rs index 6facb7750..a9af3eea6 100644 --- a/beacon_node/eth2_libp2p/src/config.rs +++ b/beacon_node/eth2_libp2p/src/config.rs @@ -56,6 +56,9 @@ pub struct Config { /// Client version pub client_version: String, + /// Disables the discovery protocol from starting. + pub disable_discovery: bool, + /// List of extra topics to initially subscribe to as strings. pub topics: Vec, } @@ -104,7 +107,7 @@ impl Default for Config { .request_retries(2) .enr_peer_update_min(2) // prevents NAT's should be raised for mainnet .query_parallelism(5) - .query_timeout(Duration::from_secs(60)) + .query_timeout(Duration::from_secs(30)) .query_peer_timeout(Duration::from_secs(2)) .ip_limit() // limits /24 IP's in buckets. .ping_interval(Duration::from_secs(300)) @@ -125,6 +128,7 @@ impl Default for Config { boot_nodes: vec![], libp2p_nodes: vec![], client_version: version::version(), + disable_discovery: false, topics, } } diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs index ca6cd9fe8..aae58d15d 100644 --- a/beacon_node/eth2_libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -51,7 +51,7 @@ const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16; pub enum DiscoveryEvent { /// A query has completed. The first parameter is the `min_ttl` of the peers if it is specified /// and the second parameter are the discovered peers. - QueryResult(Option, Box>), + QueryResult(Option, Vec), /// This indicates that our local UDP socketaddr has been updated and we should inform libp2p. SocketUpdated(SocketAddr), } @@ -112,10 +112,12 @@ enum EventStream { ), /// The future has completed. Present(mpsc::Receiver), - // The future has failed, there are no events from discv5. - Failed, + // The future has failed or discv5 has been disabled. There are no events from discv5. + InActive, } +/// The main discovery service. This can be disabled via CLI arguements. When disabled the +/// underlying processes are not started, but this struct still maintains our current ENR. pub struct Discovery { /// A collection of seen live ENRs for quick lookup and to map peer-id's to ENRs. cached_enrs: LruCache, @@ -145,6 +147,10 @@ pub struct Discovery { /// The discv5 event stream. event_stream: EventStream, + /// Indicates if the discovery service has been started. When the service is disabled, this is + /// always false. + started: bool, + /// Logger for the discovery behaviour. log: slog::Logger, } @@ -196,12 +202,16 @@ impl Discovery { }); } - // Start the discv5 service. - discv5.start(listen_socket); - debug!(log, "Discovery service started"); + // Start the discv5 service and obtain an event stream + let event_stream = if !config.disable_discovery { + discv5.start(listen_socket); + debug!(log, "Discovery service started"); + EventStream::Awaiting(Box::pin(discv5.event_stream())) + } else { + EventStream::InActive + }; // Obtain the event stream - let event_stream = EventStream::Awaiting(Box::pin(discv5.event_stream())); Ok(Self { cached_enrs: LruCache::new(50), @@ -211,6 +221,7 @@ impl Discovery { active_queries: FuturesUnordered::new(), discv5, event_stream, + started: !config.disable_discovery, log, enr_dir, }) @@ -223,10 +234,11 @@ impl Discovery { /// This adds a new `FindPeers` query to the queue if one doesn't already exist. pub fn discover_peers(&mut self) { - // If we are in the process of a query, don't bother queuing a new one. - if self.find_peer_active { + // If the discv5 service isn't running or we are in the process of a query, don't bother queuing a new one. + if !self.started || self.find_peer_active { return; } + // If there is not already a find peer's query queued, add one let query = QueryType::FindPeers; if !self.queued_queries.contains(&query) { @@ -239,54 +251,11 @@ impl Discovery { /// Processes a request to search for more peers on a subnet. pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { - self.add_subnet_query(subnet_id, min_ttl, 0); - } - - /// Adds a subnet query if one doesn't exist. If a subnet query already exists, this - /// updates the min_ttl field. - fn add_subnet_query(&mut self, subnet_id: SubnetId, min_ttl: Option, retries: usize) { - // remove the entry and complete the query if greater than the maximum search count - if retries >= MAX_DISCOVERY_RETRY { - debug!( - self.log, - "Subnet peer discovery did not find sufficient peers. Reached max retry limit" - ); + // If the discv5 service isn't running, ignore queries + if !self.started { return; } - - // Search through any queued requests and update the timeout if a query for this subnet - // already exists - let mut found = false; - for query in self.queued_queries.iter_mut() { - if let QueryType::Subnet { - subnet_id: ref mut q_subnet_id, - min_ttl: ref mut q_min_ttl, - retries: ref mut q_retries, - } = query - { - if *q_subnet_id == subnet_id { - if *q_min_ttl < min_ttl { - *q_min_ttl = min_ttl; - } - // update the number of retries - *q_retries = retries; - // mimic an `Iter::Find()` and short-circuit the loop - found = true; - break; - } - } - } - if !found { - // Set up the query and add it to the queue - let query = QueryType::Subnet { - subnet_id, - min_ttl, - retries, - }; - // update the metrics and insert into the queue. - metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); - self.queued_queries.push_back(query); - } + self.add_subnet_query(subnet_id, min_ttl, 0); } /// Add an ENR to the routing table of the discovery mechanism. @@ -295,7 +264,7 @@ impl Discovery { self.cached_enrs.put(enr.peer_id(), enr.clone()); if let Err(e) = self.discv5.add_enr(enr) { - warn!( + debug!( self.log, "Could not add peer to the local routing table"; "error" => format!("{}", e) @@ -359,7 +328,7 @@ impl Discovery { .enr_insert(BITFIELD_ENR_KEY, current_bitfield.as_ssz_bytes()); // replace the global version - *self.network_globals.local_enr.write() = self.discv5.local_enr().clone(); + *self.network_globals.local_enr.write() = self.discv5.local_enr(); Ok(()) } @@ -391,11 +360,58 @@ impl Discovery { }); // replace the global version with discovery version - *self.network_globals.local_enr.write() = self.discv5.local_enr().clone(); + *self.network_globals.local_enr.write() = self.discv5.local_enr(); } /* Internal Functions */ + /// Adds a subnet query if one doesn't exist. If a subnet query already exists, this + /// updates the min_ttl field. + fn add_subnet_query(&mut self, subnet_id: SubnetId, min_ttl: Option, retries: usize) { + // remove the entry and complete the query if greater than the maximum search count + if retries >= MAX_DISCOVERY_RETRY { + debug!( + self.log, + "Subnet peer discovery did not find sufficient peers. Reached max retry limit" + ); + return; + } + + // Search through any queued requests and update the timeout if a query for this subnet + // already exists + let mut found = false; + for query in self.queued_queries.iter_mut() { + if let QueryType::Subnet { + subnet_id: ref mut q_subnet_id, + min_ttl: ref mut q_min_ttl, + retries: ref mut q_retries, + } = query + { + if *q_subnet_id == subnet_id { + if *q_min_ttl < min_ttl { + *q_min_ttl = min_ttl; + } + // update the number of retries + *q_retries = retries; + // mimic an `Iter::Find()` and short-circuit the loop + found = true; + break; + } + } + } + if !found { + // Set up the query and add it to the queue + let query = QueryType::Subnet { + subnet_id, + min_ttl, + retries, + }; + // update the metrics and insert into the queue. + metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); + self.queued_queries.push_back(query); + } + } + /// Consume the discovery queue and initiate queries when applicable. /// /// This also sanitizes the queue removing out-dated queries. @@ -572,6 +588,10 @@ impl Discovery { // Main execution loop to be driven by the peer manager. pub fn poll(&mut self, cx: &mut Context) -> Poll { + if !self.started { + return Poll::Pending; + } + // Process the query queue self.process_queue(); @@ -582,7 +602,7 @@ impl Discovery { self.cached_enrs.put(enr.peer_id(), enr); } // return the result to the peer manager - return Poll::Ready(DiscoveryEvent::QueryResult(min_ttl, Box::new(result))); + return Poll::Ready(DiscoveryEvent::QueryResult(min_ttl, result)); } // Process the server event stream @@ -594,12 +614,12 @@ impl Discovery { Ok(stream) => self.event_stream = EventStream::Present(stream), Err(e) => { slog::crit!(self.log, "Discv5 event stream failed"; "error" => e.to_string()); - self.event_stream = EventStream::Failed; + self.event_stream = EventStream::InActive; } } } } - EventStream::Failed => {} // ignore checking the stream + EventStream::InActive => {} // ignore checking the stream EventStream::Present(ref mut stream) => { while let Ok(event) = stream.try_recv() { match event { diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 5d8720289..c384bfb31 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -422,7 +422,7 @@ impl PeerManager { /// with a new `PeerId` which involves a discovery routing table lookup. We could dial the /// multiaddr here, however this could relate to duplicate PeerId's etc. If the lookup /// proves resource constraining, we should switch to multiaddr dialling here. - fn peers_discovered(&mut self, peers: Vec, min_ttl: Option) { + fn peers_discovered(&mut self, peers: &[Enr], min_ttl: Option) { for enr in peers { let peer_id = enr.peer_id(); @@ -623,7 +623,7 @@ impl Stream for PeerManager { match event { DiscoveryEvent::SocketUpdated(socket_addr) => self.socket_updated(socket_addr), DiscoveryEvent::QueryResult(min_ttl, peers) => { - self.peers_discovered(*peers, min_ttl) + self.peers_discovered(&peers, min_ttl) } } } diff --git a/beacon_node/eth2_libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs index 1a1292faa..29bc3504c 100644 --- a/beacon_node/eth2_libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2_libp2p/src/rpc/handler.rs @@ -169,7 +169,7 @@ where /// A response has been sent, pending writing. ResponsePendingSend { /// The substream used to send the response - substream: InboundFramed, + substream: Box>, /// The message that is attempting to be sent. message: RPCCodedResponse, /// Whether a stream termination is requested. If true the stream will be closed after @@ -180,7 +180,7 @@ where /// A response has been sent, pending flush. ResponsePendingFlush { /// The substream used to send the response - substream: InboundFramed, + substream: Box>, /// Whether a stream termination is requested. If true the stream will be closed after /// this send. Otherwise it will transition to an idle state until a stream termination is /// requested or a timeout is reached. @@ -188,9 +188,9 @@ where }, /// The response stream is idle and awaiting input from the application to send more chunked /// responses. - ResponseIdle(InboundFramed), + ResponseIdle(Box>), /// The substream is attempting to shutdown. - Closing(InboundFramed), + Closing(Box>), /// Temporary state during processing Poisoned, } @@ -201,12 +201,12 @@ pub enum OutboundSubstreamState { /// handler because GOODBYE requests can be handled and responses dropped instantly. RequestPendingResponse { /// The framed negotiated substream. - substream: OutboundFramed, + substream: Box>, /// Keeps track of the actual request sent. request: RPCRequest, }, /// Closing an outbound substream> - Closing(OutboundFramed), + Closing(Box>), /// Temporary state during processing Poisoned, } @@ -326,7 +326,7 @@ where if matches!(self.state, HandlerState::Active) { debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len()); // we now drive to completion communications already dialed/established - for (id, req) in self.dial_queue.pop() { + while let Some((id, req)) = self.dial_queue.pop() { self.pending_errors.push(HandlerErr::Outbound { id, proto: req.protocol(), @@ -551,7 +551,7 @@ where self.current_inbound_substream_id, Duration::from_secs(RESPONSE_TIMEOUT), ); - let awaiting_stream = InboundSubstreamState::ResponseIdle(substream); + let awaiting_stream = InboundSubstreamState::ResponseIdle(Box::new(substream)); self.inbound_substreams.insert( self.current_inbound_substream_id, (awaiting_stream, Some(delay_key), req.protocol()), @@ -593,7 +593,7 @@ where Duration::from_secs(RESPONSE_TIMEOUT), ); let awaiting_stream = OutboundSubstreamState::RequestPendingResponse { - substream: out, + substream: Box::new(out), request, }; let expected_responses = if expected_responses > 1 { @@ -833,7 +833,7 @@ where // await flush entry.get_mut().0 = InboundSubstreamState::ResponsePendingFlush { - substream, + substream: substream, closing, }; drive_stream_further = true; @@ -853,7 +853,7 @@ where } else { // check for queued chunks and update the stream entry.get_mut().0 = apply_queued_responses( - substream, + *substream, &mut self .queued_outbound_items .get_mut(&request_id), @@ -908,7 +908,7 @@ where } else { // check for queued chunks and update the stream entry.get_mut().0 = apply_queued_responses( - substream, + *substream, &mut self .queued_outbound_items .get_mut(&request_id), @@ -942,7 +942,7 @@ where InboundSubstreamState::ResponseIdle(substream) => { if !deactivated { entry.get_mut().0 = apply_queued_responses( - substream, + *substream, &mut self.queued_outbound_items.get_mut(&request_id), &mut drive_stream_further, ); @@ -1190,10 +1190,10 @@ fn apply_queued_responses( match queue.remove(0) { RPCCodedResponse::StreamTermination(_) => { // close the stream if this is a stream termination - InboundSubstreamState::Closing(substream) + InboundSubstreamState::Closing(Box::new(substream)) } chunk => InboundSubstreamState::ResponsePendingSend { - substream, + substream: Box::new(substream), message: chunk, closing: false, }, @@ -1201,7 +1201,7 @@ fn apply_queued_responses( } _ => { // no items queued set to idle - InboundSubstreamState::ResponseIdle(substream) + InboundSubstreamState::ResponseIdle(Box::new(substream)) } } } diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 130c495a3..ea5cf8bd8 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -106,7 +106,12 @@ impl Service { )); info!(log, "Libp2p Service"; "peer_id" => format!("{:?}", enr.peer_id())); - debug!(log, "Attempting to open listening ports"; "address" => format!("{}", config.listen_address), "tcp_port" => config.libp2p_port, "udp_port" => config.discovery_port); + let discovery_string = if config.disable_discovery { + "None".into() + } else { + config.discovery_port.to_string() + }; + debug!(log, "Attempting to open listening ports"; "address" => format!("{}", config.listen_address), "tcp_port" => config.libp2p_port, "udp_port" => discovery_string); let mut swarm = { // Set up the transport - tcp/ws with noise and yamux/mplex diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index 7ed904df4..5ee201ff9 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -260,7 +260,7 @@ impl AttestationService { /// verification, re-propagates and returns false. pub fn should_process_attestation( &mut self, - subnet: &SubnetId, + subnet: SubnetId, attestation: &Attestation, ) -> bool { let exact_subnet = ExactSubnet { diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index bc77f6cee..05799dc66 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -218,11 +218,9 @@ impl Router { match gossip_message { // Attestations should never reach the router. PubsubMessage::AggregateAndProofAttestation(aggregate_and_proof) => { - if let Some(gossip_verified) = - self.processor.verify_aggregated_attestation_for_gossip( - peer_id.clone(), - *aggregate_and_proof.clone(), - ) + if let Some(gossip_verified) = self + .processor + .verify_aggregated_attestation_for_gossip(peer_id.clone(), *aggregate_and_proof) { self.propagate_message(id, peer_id.clone()); self.processor diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 383a4d276..4a410cef9 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -294,7 +294,7 @@ fn spawn_service( match message { // attestation information gets processed in the attestation service PubsubMessage::Attestation(ref subnet_and_attestation) => { - let subnet = &subnet_and_attestation.0; + let subnet = subnet_and_attestation.0; let attestation = &subnet_and_attestation.1; // checks if we have an aggregator for the slot. If so, we process // the attestation diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index eea70305e..3040fc245 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -534,7 +534,7 @@ impl SyncManager { // otherwise, this is a range sync issue, notify the range sync self.range_sync - .inject_error(&mut self.network, peer_id.clone(), request_id); + .inject_error(&mut self.network, peer_id, request_id); } fn peer_disconnect(&mut self, peer_id: &PeerId) { @@ -672,7 +672,7 @@ impl SyncManager { "last_peer" => format!("{:?}", parent_request.last_submitted_peer), ); self.network - .downvote_peer(parent_request.last_submitted_peer.clone()); + .downvote_peer(parent_request.last_submitted_peer); return; } Err(e) => { @@ -682,7 +682,7 @@ impl SyncManager { "last_peer" => format!("{:?}", parent_request.last_submitted_peer), ); self.network - .downvote_peer(parent_request.last_submitted_peer.clone()); + .downvote_peer(parent_request.last_submitted_peer); return; } } diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index 7c1f1bdc0..08df5200f 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -482,6 +482,7 @@ pub async fn publish_attestations( /// Processes an unaggregrated attestation that was included in a list of attestations with the /// index `i`. +#[allow(clippy::redundant_clone)] // false positives in this function. fn process_unaggregated_attestation( beacon_chain: &BeaconChain, network_chan: NetworkChannel, @@ -549,6 +550,7 @@ fn process_unaggregated_attestation( } /// HTTP Handler to publish an Attestation, which has been signed by a validator. +#[allow(clippy::redundant_clone)] // false positives in this function. pub async fn publish_aggregate_and_proofs( req: Request, beacon_chain: Arc>, @@ -596,6 +598,7 @@ pub async fn publish_aggregate_and_proofs( /// Processes an aggregrated attestation that was included in a list of attestations with the index /// `i`. +#[allow(clippy::redundant_clone)] // false positives in this function. fn process_aggregated_attestation( beacon_chain: &BeaconChain, network_chan: NetworkChannel, diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 50f635d0d..07cc44fab 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -61,7 +61,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true), ) .arg( - Arg::with_name("max_peers") + Arg::with_name("max-peers") .long("max-peers") .help("The maximum number of peers.") .default_value("50") @@ -125,6 +125,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { without an ENR.") .takes_value(true), ) + .arg( + Arg::with_name("disable-discovery") + .long("disable-discovery") + .help("Disables the discv5 discovery protocol. The node will not search for new peers or participate in the discovery protocol.") + .takes_value(false), + ) + /* REST API related arguments */ .arg( Arg::with_name("http") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index f794f8886..5cb029a36 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -7,14 +7,11 @@ use eth2_testnet_config::Eth2TestnetConfig; use slog::{crit, info, Logger}; use ssz::Encode; use std::fs; -use std::fs::File; -use std::io::prelude::*; use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs}; use std::net::{TcpListener, UdpSocket}; use std::path::PathBuf; use types::{ChainSpec, EthSpec}; -pub const CLIENT_CONFIG_FILENAME: &str = "beacon-node.toml"; pub const BEACON_NODE_DIR: &str = "beacon"; pub const NETWORK_DIR: &str = "network"; @@ -42,7 +39,7 @@ pub fn get_config( fs::remove_dir_all( client_config .get_db_path() - .ok_or("Failed to get db_path".to_string())?, + .ok_or_else(|| "Failed to get db_path".to_string())?, ) .map_err(|err| format!("Failed to remove chain_db: {}", err))?; @@ -50,7 +47,7 @@ pub fn get_config( fs::remove_dir_all( client_config .get_freezer_db_path() - .ok_or("Failed to get freezer db path".to_string())?, + .ok_or_else(|| "Failed to get freezer db path".to_string())?, ) .map_err(|err| format!("Failed to remove chain_db: {}", err))?; @@ -72,17 +69,7 @@ pub fn get_config( log_dir.pop(); info!(log, "Data directory initialised"; "datadir" => format!("{}",log_dir.into_os_string().into_string().expect("Datadir should be a valid os string"))); - // Load the client config, if it exists . - let config_file_path = client_config.data_dir.join(CLIENT_CONFIG_FILENAME); - let config_file_existed = config_file_path.exists(); - if config_file_existed { - client_config = read_from_file(config_file_path.clone()) - .map_err(|e| format!("Unable to parse {:?} file: {:?}", config_file_path, e))? - .ok_or_else(|| format!("{:?} file does not exist", config_file_path))?; - } else { - client_config.spec_constants = spec_constants.into(); - } - + client_config.spec_constants = spec_constants.into(); client_config.testnet_dir = get_testnet_dir(cli_args); /* @@ -208,6 +195,11 @@ pub fn get_config( client_config.network.discv5_config.enr_update = false; } + if cli_args.is_present("disable-discovery") { + client_config.network.disable_discovery = true; + slog::warn!(log, "Discovery is disabled. New peers will not be found"); + } + /* * Http server */ @@ -295,7 +287,7 @@ pub fn get_config( if spec_constants != client_config.spec_constants { crit!(log, "Specification constants do not match."; - "client_config" => client_config.spec_constants.to_string(), + "client_config" => client_config.spec_constants, "eth2_config" => spec_constants ); return Err("Specification constant mismatch".into()); @@ -351,10 +343,6 @@ pub fn get_config( client_config.genesis = ClientGenesis::DepositContract; } - if !config_file_existed { - write_to_file(config_file_path, &client_config)?; - } - Ok(client_config) } @@ -436,42 +424,3 @@ pub fn unused_port(transport: &str) -> Result { }; Ok(local_addr.port()) } - -/// Write a configuration to file. -pub fn write_to_file(path: PathBuf, config: &T) -> Result<(), String> -where - T: Default + serde::de::DeserializeOwned + serde::Serialize, -{ - if let Ok(mut file) = File::create(path.clone()) { - let toml_encoded = toml::to_string(&config).map_err(|e| { - format!( - "Failed to write configuration to {:?}. Error: {:?}", - path, e - ) - })?; - file.write_all(toml_encoded.as_bytes()) - .unwrap_or_else(|_| panic!("Unable to write to {:?}", path)); - } - - Ok(()) -} - -/// Loads a `ClientConfig` from file. If unable to load from file, generates a default -/// configuration and saves that as a sample file. -pub fn read_from_file(path: PathBuf) -> Result, String> -where - T: Default + serde::de::DeserializeOwned + serde::Serialize, -{ - if let Ok(mut file) = File::open(path.clone()) { - let mut contents = String::new(); - file.read_to_string(&mut contents) - .map_err(|e| format!("Unable to read {:?}. Error: {:?}", path, e))?; - - let config = toml::from_str(&contents) - .map_err(|e| format!("Unable to parse {:?}: {:?}", path, e))?; - - Ok(Some(config)) - } else { - Ok(None) - } -} diff --git a/common/eth2_testnet_config/build.rs b/common/eth2_testnet_config/build.rs index d2970cb1b..38fab42a7 100644 --- a/common/eth2_testnet_config/build.rs +++ b/common/eth2_testnet_config/build.rs @@ -10,12 +10,13 @@ const TESTNET_ID: &str = "altona"; fn main() { if !base_dir().exists() { - std::fs::create_dir_all(base_dir()).expect(&format!("Unable to create {:?}", base_dir())); + std::fs::create_dir_all(base_dir()) + .unwrap_or_else(|_| panic!("Unable to create {:?}", base_dir())); match get_all_files() { Ok(()) => (), Err(e) => { - std::fs::remove_dir_all(base_dir()).expect(&format!( + std::fs::remove_dir_all(base_dir()).unwrap_or_else(|_| panic!( "{}. Failed to remove {:?}, please remove the directory manually because it may contains incomplete testnet data.", e, base_dir(), diff --git a/common/validator_dir/src/builder.rs b/common/validator_dir/src/builder.rs index 9f29f3dc0..a33d910b5 100644 --- a/common/validator_dir/src/builder.rs +++ b/common/validator_dir/src/builder.rs @@ -182,7 +182,7 @@ impl<'a> Builder<'a> { // // This allows us to know the RLP data for the eth1 transaction without needing to know // the withdrawal/voting keypairs again at a later date. - let path = dir.clone().join(ETH1_DEPOSIT_DATA_FILE); + let path = dir.join(ETH1_DEPOSIT_DATA_FILE); if path.exists() { return Err(Error::DepositDataAlreadyExists(path)); } else { @@ -191,7 +191,7 @@ impl<'a> Builder<'a> { .write(true) .read(true) .create(true) - .open(path.clone()) + .open(path) .map_err(Error::UnableToSaveDepositData)? .write_all(hex.as_bytes()) .map_err(Error::UnableToSaveDepositData)? @@ -200,7 +200,7 @@ impl<'a> Builder<'a> { // Save `ETH1_DEPOSIT_AMOUNT_FILE` to file. // // This allows us to know the intended deposit amount at a later date. - let path = dir.clone().join(ETH1_DEPOSIT_AMOUNT_FILE); + let path = dir.join(ETH1_DEPOSIT_AMOUNT_FILE); if path.exists() { return Err(Error::DepositAmountAlreadyExists(path)); } else { @@ -208,7 +208,7 @@ impl<'a> Builder<'a> { .write(true) .read(true) .create(true) - .open(path.clone()) + .open(path) .map_err(Error::UnableToSaveDepositAmount)? .write_all(format!("{}", amount).as_bytes()) .map_err(Error::UnableToSaveDepositAmount)? @@ -220,29 +220,24 @@ impl<'a> Builder<'a> { // Write the withdrawal password to file. write_password_to_file( self.password_dir - .clone() .join(withdrawal_keypair.pk.as_hex_string()), withdrawal_password.as_bytes(), )?; // Write the withdrawal keystore to file. - write_keystore_to_file( - dir.clone().join(WITHDRAWAL_KEYSTORE_FILE), - &withdrawal_keystore, - )?; + write_keystore_to_file(dir.join(WITHDRAWAL_KEYSTORE_FILE), &withdrawal_keystore)?; } } // Write the voting password to file. write_password_to_file( self.password_dir - .clone() .join(format!("0x{}", voting_keystore.pubkey())), voting_password.as_bytes(), )?; // Write the voting keystore to file. - write_keystore_to_file(dir.clone().join(VOTING_KEYSTORE_FILE), &voting_keystore)?; + write_keystore_to_file(dir.join(VOTING_KEYSTORE_FILE), &voting_keystore)?; ValidatorDir::open(dir).map_err(Error::UnableToOpenDir) } @@ -257,7 +252,7 @@ fn write_keystore_to_file(path: PathBuf, keystore: &Keystore) -> Result<(), Erro .write(true) .read(true) .create_new(true) - .open(path.clone()) + .open(path) .map_err(Error::UnableToSaveKeystore)?; keystore.to_json_writer(file).map_err(Into::into) diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 01080a62d..13d167936 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -190,7 +190,7 @@ fn dequeue_attestations( queued_attestations .iter() .position(|a| a.slot >= current_slot) - .unwrap_or(queued_attestations.len()), + .unwrap_or_else(|| queued_attestations.len()), ); std::mem::replace(queued_attestations, remaining) @@ -286,6 +286,7 @@ where /// Equivalent to: /// /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#get_ancestor + #[allow(clippy::if_same_then_else)] fn get_ancestor( &self, block_root: Hash256, diff --git a/consensus/proto_array/src/fork_choice_test_definition.rs b/consensus/proto_array/src/fork_choice_test_definition.rs index cbb905072..6e1bd970b 100644 --- a/consensus/proto_array/src/fork_choice_test_definition.rs +++ b/consensus/proto_array/src/fork_choice_test_definition.rs @@ -80,10 +80,9 @@ impl ForkChoiceTestDefinition { finalized_epoch, &justified_state_balances, ) - .expect(&format!( - "find_head op at index {} returned error", - op_index - )); + .unwrap_or_else(|_| { + panic!("find_head op at index {} returned error", op_index) + }); assert_eq!( head, expected_head, @@ -129,10 +128,12 @@ impl ForkChoiceTestDefinition { justified_epoch, finalized_epoch, }; - fork_choice.process_block(block).expect(&format!( - "process_block op at index {} returned error", - op_index - )); + fork_choice.process_block(block).unwrap_or_else(|e| { + panic!( + "process_block op at index {} returned error: {:?}", + op_index, e + ) + }); check_bytes_round_trip(&fork_choice); } Operation::ProcessAttestation { @@ -142,10 +143,12 @@ impl ForkChoiceTestDefinition { } => { fork_choice .process_attestation(validator_index, block_root, target_epoch) - .expect(&format!( - "process_attestation op at index {} returned error", - op_index - )); + .unwrap_or_else(|_| { + panic!( + "process_attestation op at index {} returned error", + op_index + ) + }); check_bytes_round_trip(&fork_choice); } Operation::Prune { diff --git a/consensus/proto_array/src/fork_choice_test_definition/ffg_updates.rs b/consensus/proto_array/src/fork_choice_test_definition/ffg_updates.rs index 9dd9417f2..4b7eb25d7 100644 --- a/consensus/proto_array/src/fork_choice_test_definition/ffg_updates.rs +++ b/consensus/proto_array/src/fork_choice_test_definition/ffg_updates.rs @@ -91,7 +91,7 @@ pub fn get_ffg_case_01_test_definition() -> ForkChoiceTestDefinition { justified_epoch: Epoch::new(2), justified_root: get_hash(3), finalized_epoch: Epoch::new(1), - justified_state_balances: balances.clone(), + justified_state_balances: balances, expected_head: get_hash(3), }); @@ -421,7 +421,7 @@ pub fn get_ffg_case_02_test_definition() -> ForkChoiceTestDefinition { justified_epoch: Epoch::new(3), justified_root: get_hash(2), finalized_epoch: Epoch::new(0), - justified_state_balances: balances.clone(), + justified_state_balances: balances, }); // END OF TESTS diff --git a/consensus/proto_array/src/fork_choice_test_definition/no_votes.rs b/consensus/proto_array/src/fork_choice_test_definition/no_votes.rs index 279cde52c..e42abe288 100644 --- a/consensus/proto_array/src/fork_choice_test_definition/no_votes.rs +++ b/consensus/proto_array/src/fork_choice_test_definition/no_votes.rs @@ -212,7 +212,7 @@ pub fn get_no_votes_test_definition() -> ForkChoiceTestDefinition { justified_epoch: Epoch::new(2), justified_root: get_hash(5), finalized_epoch: Epoch::new(1), - justified_state_balances: balances.clone(), + justified_state_balances: balances, expected_head: get_hash(6), }, ]; diff --git a/consensus/proto_array/src/fork_choice_test_definition/votes.rs b/consensus/proto_array/src/fork_choice_test_definition/votes.rs index 4f8091269..ac9513c5f 100644 --- a/consensus/proto_array/src/fork_choice_test_definition/votes.rs +++ b/consensus/proto_array/src/fork_choice_test_definition/votes.rs @@ -673,7 +673,7 @@ pub fn get_votes_test_definition() -> ForkChoiceTestDefinition { justified_epoch: Epoch::new(2), justified_root: get_hash(5), finalized_epoch: Epoch::new(2), - justified_state_balances: balances.clone(), + justified_state_balances: balances, expected_head: get_hash(11), }); diff --git a/consensus/types/src/attester_slashing.rs b/consensus/types/src/attester_slashing.rs index 7deaf56f9..876bb88e6 100644 --- a/consensus/types/src/attester_slashing.rs +++ b/consensus/types/src/attester_slashing.rs @@ -10,20 +10,8 @@ use tree_hash_derive::TreeHash; /// /// Spec v0.12.1 #[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] -#[derive( - Derivative, - Debug, - PartialEq, - Eq, - Clone, - Serialize, - Deserialize, - Encode, - Decode, - TreeHash, - TestRandom, -)] -#[derivative(Hash(bound = "T: EthSpec"))] +#[derive(Derivative, Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, TestRandom)] +#[derivative(PartialEq, Eq, Hash(bound = "T: EthSpec"))] #[serde(bound = "T: EthSpec")] pub struct AttesterSlashing { pub attestation_1: IndexedAttestation, diff --git a/consensus/types/src/indexed_attestation.rs b/consensus/types/src/indexed_attestation.rs index cb9f5eacc..341db1807 100644 --- a/consensus/types/src/indexed_attestation.rs +++ b/consensus/types/src/indexed_attestation.rs @@ -1,4 +1,5 @@ use crate::{test_utils::TestRandom, AggregateSignature, AttestationData, EthSpec, VariableList}; +use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz::Encode; use ssz_derive::{Decode, Encode}; @@ -12,9 +13,8 @@ use tree_hash_derive::TreeHash; /// /// Spec v0.12.1 #[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] -#[derive( - Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, TestRandom, -)] +#[derive(Derivative, Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash, TestRandom)] +#[derivative(PartialEq, Eq)] // to satisfy Clippy's lint about `Hash` #[serde(bound = "T: EthSpec")] pub struct IndexedAttestation { /// Lists validator registry indices, not committee indices. diff --git a/consensus/types/src/test_utils/builders/testing_attester_slashing_builder.rs b/consensus/types/src/test_utils/builders/testing_attester_slashing_builder.rs index 05aabbf18..356f10ec8 100644 --- a/consensus/types/src/test_utils/builders/testing_attester_slashing_builder.rs +++ b/consensus/types/src/test_utils/builders/testing_attester_slashing_builder.rs @@ -47,11 +47,11 @@ impl TestingAttesterSlashingBuilder { }; let data_2 = if test_task == AttesterSlashingTestTask::NotSlashable { - AttestationData { ..data_1.clone() } + data_1.clone() } else { AttestationData { target: checkpoint_2, - ..data_1.clone() + ..data_1 } }; diff --git a/crypto/eth2_keystore/src/keystore.rs b/crypto/eth2_keystore/src/keystore.rs index 8d68265a7..b23751f82 100644 --- a/crypto/eth2_keystore/src/keystore.rs +++ b/crypto/eth2_keystore/src/keystore.rs @@ -190,7 +190,7 @@ impl Keystore { let keypair = keypair_from_secret(plain_text.as_bytes())?; // Verify that the derived `PublicKey` matches `self`. - if keypair.pk.as_hex_string()[2..].to_string() != self.json.pubkey { + if keypair.pk.as_hex_string()[2..] != self.json.pubkey { return Err(Error::PublicKeyMismatch); } diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index aeec4dbed..3d26aa6d2 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -283,7 +283,7 @@ impl Environment { executor: TaskExecutor { exit: self.exit.clone(), handle: self.runtime().handle().clone(), - log: self.log.new(o!("service" => service_name.clone())), + log: self.log.new(o!("service" => service_name)), }, eth_spec_instance: self.eth_spec_instance.clone(), eth2_config: self.eth2_config.clone(), diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index d68ccc008..d5a2c2d48 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -232,12 +232,12 @@ impl DutiesStore { .collect() } - fn is_aggregator(&self, validator_pubkey: &PublicKey, epoch: &Epoch) -> Option { + fn is_aggregator(&self, validator_pubkey: &PublicKey, epoch: Epoch) -> Option { Some( self.store .read() .get(validator_pubkey)? - .get(epoch)? + .get(&epoch)? .selection_proof .is_some(), ) @@ -604,7 +604,7 @@ impl DutiesService { // The selection proof is computed on `store.insert`, so it's necessary to check // with the store that the validator is an aggregator. - let is_aggregator = self.store.is_aggregator(&validator_pubkey, &epoch)?; + let is_aggregator = self.store.is_aggregator(&validator_pubkey, epoch)?; if outcome.is_subscription_candidate() { Some(ValidatorSubscription {