From 9ae9df806cb5288fd0f37c03cd5dee4d01d86921 Mon Sep 17 00:00:00 2001 From: divma Date: Tue, 28 Jul 2020 01:39:42 +0000 Subject: [PATCH] Fix clippy lints rpc (#1401) ## Issue Addressed #1388 partially (eth2_libp2p & network) ## Proposed Changes TLDR at the end - *Complex types* are 3 on the handlers/Behaviours but the types are `Poll` where `ComplexType` comes from the traits of libp2p. Those, I don't thing are worth an alias. A couple more were from using tokio combinators and were removed writing things the async way and using [`BoxFuture`](https://docs.rs/futures/0.3.5/futures/future/type.BoxFuture.html) - The *cognitive complexity*.. I tried to address those before (they come from the poll functions too) and tbh they are cognitively simpler to understand the way they are now. Moving separate parts to functions doesn't add much since that code is not repeated and they all do early returns. If moved those returns would now need to be wrapped in an Option, probably, and checked to be returned again. I would leave them like that but that's just preference. - *Too many arguments*: They are not easily put together in a wrapping struct since the parameters don't relate semantically (Ex: fn new with a log, a reference to the chain, a peer, etc) but some may differ. - *Needless returns* were indeed needless ## Additional Info TLDR: removed needless return, used BoxFuture and async, left the rest untouched since those lgtm --- beacon_node/eth2_libp2p/src/rpc/protocol.rs | 91 ++++++++++----------- beacon_node/network/src/sync/manager.rs | 4 +- 2 files changed, 43 insertions(+), 52 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/rpc/protocol.rs b/beacon_node/eth2_libp2p/src/rpc/protocol.rs index 812596495..6f95f6679 100644 --- a/beacon_node/eth2_libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2_libp2p/src/rpc/protocol.rs @@ -1,5 +1,3 @@ -#![allow(clippy::type_complexity)] - use super::methods::*; use crate::rpc::{ codec::{ @@ -11,7 +9,7 @@ use crate::rpc::{ methods::ResponseTermination, MaxRequestBlocks, MAX_REQUEST_BLOCKS, }; -use futures::future::Ready; +use futures::future::BoxFuture; use futures::prelude::*; use futures::prelude::{AsyncRead, AsyncWrite}; use libp2p::core::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; @@ -19,7 +17,6 @@ use ssz::Encode; use ssz_types::VariableList; use std::io; use std::marker::PhantomData; -use std::pin::Pin; use std::time::Duration; use tokio_io_timeout::TimeoutStream; use tokio_util::{ @@ -206,13 +203,6 @@ impl ProtocolName for ProtocolId { pub type InboundOutput = (RPCRequest, InboundFramed); pub type InboundFramed = Framed>, InboundCodec>; -type FnAndThen = fn( - ( - Option, RPCError>>, - InboundFramed, - ), -) -> Ready, RPCError>>; -type FnMapErr = fn(tokio::time::Elapsed) -> RPCError; impl InboundUpgrade for RPCProtocol where @@ -221,45 +211,48 @@ where { type Output = InboundOutput; type Error = RPCError; - type Future = Pin> + Send>>; + type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future { - let protocol_name = protocol.message_name; - // convert the socket to tokio compatible socket - let socket = socket.compat(); - let codec = match protocol.encoding { - Encoding::SSZSnappy => { - let ssz_snappy_codec = - BaseInboundCodec::new(SSZSnappyInboundCodec::new(protocol, MAX_RPC_SIZE)); - InboundCodec::SSZSnappy(ssz_snappy_codec) - } - Encoding::SSZ => { - let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE)); - InboundCodec::SSZ(ssz_codec) - } - }; - let mut timed_socket = TimeoutStream::new(socket); - timed_socket.set_read_timeout(Some(Duration::from_secs(TTFB_TIMEOUT))); + async move { + let protocol_name = protocol.message_name; + // convert the socket to tokio compatible socket + let socket = socket.compat(); + let codec = match protocol.encoding { + Encoding::SSZSnappy => { + let ssz_snappy_codec = + BaseInboundCodec::new(SSZSnappyInboundCodec::new(protocol, MAX_RPC_SIZE)); + InboundCodec::SSZSnappy(ssz_snappy_codec) + } + Encoding::SSZ => { + let ssz_codec = + BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE)); + InboundCodec::SSZ(ssz_codec) + } + }; + let mut timed_socket = TimeoutStream::new(socket); + timed_socket.set_read_timeout(Some(Duration::from_secs(TTFB_TIMEOUT))); - let socket = Framed::new(timed_socket, codec); + let socket = Framed::new(timed_socket, codec); - // MetaData requests should be empty, return the stream - Box::pin(match protocol_name { - Protocol::MetaData => { - future::Either::Left(future::ok((RPCRequest::MetaData(PhantomData), socket))) + // MetaData requests should be empty, return the stream + match protocol_name { + Protocol::MetaData => Ok((RPCRequest::MetaData(PhantomData), socket)), + _ => { + match tokio::time::timeout( + Duration::from_secs(REQUEST_TIMEOUT), + socket.into_future(), + ) + .await + { + Err(e) => Err(RPCError::from(e)), + Ok((Some(Ok(request)), stream)) => Ok((request, stream)), + Ok((Some(Err(_)), _)) | Ok((None, _)) => Err(RPCError::IncompleteStream), + } + } } - - _ => future::Either::Right( - tokio::time::timeout(Duration::from_secs(REQUEST_TIMEOUT), socket.into_future()) - .map_err(RPCError::from as FnMapErr) - .and_then({ - |(req, stream)| match req { - Some(Ok(request)) => future::ok((request, stream)), - Some(Err(_)) | None => future::err(RPCError::IncompleteStream), - } - } as FnAndThen), - ), - }) + } + .boxed() } } @@ -375,7 +368,7 @@ where { type Output = OutboundFramed; type Error = RPCError; - type Future = Pin> + Send>>; + type Future = BoxFuture<'static, Result>; fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future { // convert to a tokio compatible socket @@ -395,12 +388,12 @@ where let mut socket = Framed::new(socket, codec); - let future = async { + async { socket.send(self).await?; socket.close().await?; Ok(socket) - }; - Box::pin(future) + } + .boxed() } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 2952b6993..2f388e461 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -593,11 +593,11 @@ impl SyncManager { info!(self.log, "Sync state updated"; "old_state" => format!("{}", old_state), "new_state" => format!("{}",new_state)); } } + /* Processing State Functions */ // These functions are called in the main poll function to transition the state of the sync // manager - #[allow(clippy::needless_return)] /// A new block has been received for a parent lookup query, process it. fn process_parent_request(&mut self, mut parent_request: ParentRequests) { // verify the last added block is the parent of the last requested block @@ -658,7 +658,6 @@ impl SyncManager { // add the block back to the queue and continue the search parent_request.downloaded_blocks.push(newest_block); self.request_parent(parent_request); - return; } Ok(_) | Err(BlockError::BlockIsAlreadyKnown { .. }) => { spawn_block_processor( @@ -685,7 +684,6 @@ impl SyncManager { parent_request.last_submitted_peer, PeerAction::MidToleranceError, ); - return; } } }