diff --git a/beacon_node/eth2_libp2p/src/rpc/protocol.rs b/beacon_node/eth2_libp2p/src/rpc/protocol.rs index 812596495..6f95f6679 100644 --- a/beacon_node/eth2_libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2_libp2p/src/rpc/protocol.rs @@ -1,5 +1,3 @@ -#![allow(clippy::type_complexity)] - use super::methods::*; use crate::rpc::{ codec::{ @@ -11,7 +9,7 @@ use crate::rpc::{ methods::ResponseTermination, MaxRequestBlocks, MAX_REQUEST_BLOCKS, }; -use futures::future::Ready; +use futures::future::BoxFuture; use futures::prelude::*; use futures::prelude::{AsyncRead, AsyncWrite}; use libp2p::core::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; @@ -19,7 +17,6 @@ use ssz::Encode; use ssz_types::VariableList; use std::io; use std::marker::PhantomData; -use std::pin::Pin; use std::time::Duration; use tokio_io_timeout::TimeoutStream; use tokio_util::{ @@ -206,13 +203,6 @@ impl ProtocolName for ProtocolId { pub type InboundOutput = (RPCRequest, InboundFramed); pub type InboundFramed = Framed>, InboundCodec>; -type FnAndThen = fn( - ( - Option, RPCError>>, - InboundFramed, - ), -) -> Ready, RPCError>>; -type FnMapErr = fn(tokio::time::Elapsed) -> RPCError; impl InboundUpgrade for RPCProtocol where @@ -221,45 +211,48 @@ where { type Output = InboundOutput; type Error = RPCError; - type Future = Pin> + Send>>; + type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future { - let protocol_name = protocol.message_name; - // convert the socket to tokio compatible socket - let socket = socket.compat(); - let codec = match protocol.encoding { - Encoding::SSZSnappy => { - let ssz_snappy_codec = - BaseInboundCodec::new(SSZSnappyInboundCodec::new(protocol, MAX_RPC_SIZE)); - InboundCodec::SSZSnappy(ssz_snappy_codec) - } - Encoding::SSZ => { - let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE)); - InboundCodec::SSZ(ssz_codec) - } - }; - let mut timed_socket = TimeoutStream::new(socket); - timed_socket.set_read_timeout(Some(Duration::from_secs(TTFB_TIMEOUT))); + async move { + let protocol_name = protocol.message_name; + // convert the socket to tokio compatible socket + let socket = socket.compat(); + let codec = match protocol.encoding { + Encoding::SSZSnappy => { + let ssz_snappy_codec = + BaseInboundCodec::new(SSZSnappyInboundCodec::new(protocol, MAX_RPC_SIZE)); + InboundCodec::SSZSnappy(ssz_snappy_codec) + } + Encoding::SSZ => { + let ssz_codec = + BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE)); + InboundCodec::SSZ(ssz_codec) + } + }; + let mut timed_socket = TimeoutStream::new(socket); + timed_socket.set_read_timeout(Some(Duration::from_secs(TTFB_TIMEOUT))); - let socket = Framed::new(timed_socket, codec); + let socket = Framed::new(timed_socket, codec); - // MetaData requests should be empty, return the stream - Box::pin(match protocol_name { - Protocol::MetaData => { - future::Either::Left(future::ok((RPCRequest::MetaData(PhantomData), socket))) + // MetaData requests should be empty, return the stream + match protocol_name { + Protocol::MetaData => Ok((RPCRequest::MetaData(PhantomData), socket)), + _ => { + match tokio::time::timeout( + Duration::from_secs(REQUEST_TIMEOUT), + socket.into_future(), + ) + .await + { + Err(e) => Err(RPCError::from(e)), + Ok((Some(Ok(request)), stream)) => Ok((request, stream)), + Ok((Some(Err(_)), _)) | Ok((None, _)) => Err(RPCError::IncompleteStream), + } + } } - - _ => future::Either::Right( - tokio::time::timeout(Duration::from_secs(REQUEST_TIMEOUT), socket.into_future()) - .map_err(RPCError::from as FnMapErr) - .and_then({ - |(req, stream)| match req { - Some(Ok(request)) => future::ok((request, stream)), - Some(Err(_)) | None => future::err(RPCError::IncompleteStream), - } - } as FnAndThen), - ), - }) + } + .boxed() } } @@ -375,7 +368,7 @@ where { type Output = OutboundFramed; type Error = RPCError; - type Future = Pin> + Send>>; + type Future = BoxFuture<'static, Result>; fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future { // convert to a tokio compatible socket @@ -395,12 +388,12 @@ where let mut socket = Framed::new(socket, codec); - let future = async { + async { socket.send(self).await?; socket.close().await?; Ok(socket) - }; - Box::pin(future) + } + .boxed() } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 2952b6993..2f388e461 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -593,11 +593,11 @@ impl SyncManager { info!(self.log, "Sync state updated"; "old_state" => format!("{}", old_state), "new_state" => format!("{}",new_state)); } } + /* Processing State Functions */ // These functions are called in the main poll function to transition the state of the sync // manager - #[allow(clippy::needless_return)] /// A new block has been received for a parent lookup query, process it. fn process_parent_request(&mut self, mut parent_request: ParentRequests) { // verify the last added block is the parent of the last requested block @@ -658,7 +658,6 @@ impl SyncManager { // add the block back to the queue and continue the search parent_request.downloaded_blocks.push(newest_block); self.request_parent(parent_request); - return; } Ok(_) | Err(BlockError::BlockIsAlreadyKnown { .. }) => { spawn_block_processor( @@ -685,7 +684,6 @@ impl SyncManager { parent_request.last_submitted_peer, PeerAction::MidToleranceError, ); - return; } } }