From 32a025bdf78d818939bfa047f3389246d23eb6d8 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 25 Mar 2019 16:48:44 +1100 Subject: [PATCH] Introduced `RequestId` newtype --- beacon_node/eth2-libp2p/src/rpc/mod.rs | 2 +- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 58 +++++++++++++- beacon_node/network/src/message_handler.rs | 88 ++++++++++++--------- beacon_node/network/src/sync/simple_sync.rs | 12 ++- beacon_node/network/tests/tests.rs | 16 ++-- 5 files changed, 125 insertions(+), 51 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 925c36616..e04540416 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -12,7 +12,7 @@ use libp2p::core::swarm::{ }; use libp2p::{Multiaddr, PeerId}; pub use methods::{HelloMessage, IncomingGossip, RPCMethod, RPCRequest, RPCResponse}; -pub use protocol::{RPCEvent, RPCProtocol}; +pub use protocol::{RPCEvent, RPCProtocol, RequestId}; use slog::o; use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncWrite}; diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index b328dd0dd..2c8945cb8 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -1,6 +1,7 @@ use super::methods::*; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use ssz::{ssz_encode, Decodable, Encodable, SszStream}; +use ssz::{ssz_encode, Decodable, DecodeError as SSZDecodeError, Encodable, SszStream}; +use std::hash::{Hash, Hasher}; use std::io; use std::iter; use tokio::io::{AsyncRead, AsyncWrite}; @@ -29,16 +30,65 @@ impl Default for RPCProtocol { } } +/// A monotonic counter for ordering `RPCRequest`s. +#[derive(Debug, Clone, PartialEq, Default)] +pub struct RequestId(u64); + +impl RequestId { + /// Increment the request id. + pub fn increment(&mut self) { + self.0 += 1 + } + + /// Return the previous id. + pub fn previous(&self) -> Self { + Self(self.0 - 1) + } +} + +impl Eq for RequestId {} + +impl Hash for RequestId { + fn hash(&self, state: &mut H) { + self.0.hash(state); + } +} + +impl From for RequestId { + fn from(x: u64) -> RequestId { + RequestId(x) + } +} + +impl Into for RequestId { + fn into(self) -> u64 { + self.0 + } +} + +impl Encodable for RequestId { + fn ssz_append(&self, s: &mut SszStream) { + self.0.ssz_append(s); + } +} + +impl Decodable for RequestId { + fn ssz_decode(bytes: &[u8], index: usize) -> Result<(Self, usize), SSZDecodeError> { + let (id, index) = u64::ssz_decode(bytes, index)?; + Ok((Self::from(id), index)) + } +} + /// The RPC types which are sent/received in this protocol. #[derive(Debug, Clone)] pub enum RPCEvent { Request { - id: u64, + id: RequestId, method_id: u16, body: RPCRequest, }, Response { - id: u64, + id: RequestId, method_id: u16, //TODO: Remove and process decoding upstream result: RPCResponse, }, @@ -72,7 +122,7 @@ fn decode(packet: Vec) -> Result { // decode the header of the rpc // request/response let (request, index) = bool::ssz_decode(&packet, 0)?; - let (id, index) = u64::ssz_decode(&packet, index)?; + let (id, index) = RequestId::ssz_decode(&packet, index)?; let (method_id, index) = u16::ssz_decode(&packet, index)?; if request { diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 5b3fe1a63..12fb2fa6e 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -4,7 +4,7 @@ use crate::service::{NetworkMessage, OutgoingMessage}; use crate::sync::SimpleSync; use crossbeam_channel::{unbounded as channel, Sender}; use eth2_libp2p::{ - rpc::{methods::GoodbyeReason, IncomingGossip, RPCRequest, RPCResponse}, + rpc::{methods::GoodbyeReason, IncomingGossip, RPCRequest, RPCResponse, RequestId}, PeerId, RPCEvent, }; use futures::future; @@ -111,25 +111,31 @@ impl MessageHandler { } /// A new RPC request has been received from the network. - fn handle_rpc_request(&mut self, peer_id: PeerId, _id: u64, request: RPCRequest) { + fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) { // TODO: process the `id`. match request { - RPCRequest::Hello(hello_message) => { - self.sync - .on_hello_request(peer_id, hello_message, &mut self.network_context) - } + RPCRequest::Hello(hello_message) => self.sync.on_hello_request( + peer_id, + request_id, + hello_message, + &mut self.network_context, + ), RPCRequest::Goodbye(goodbye_reason) => self.sync.on_goodbye(peer_id, goodbye_reason), - RPCRequest::BeaconBlockRoots(request) => { - self.sync - .on_beacon_block_roots_request(peer_id, request, &mut self.network_context) - } + RPCRequest::BeaconBlockRoots(request) => self.sync.on_beacon_block_roots_request( + peer_id, + request_id, + request, + &mut self.network_context, + ), RPCRequest::BeaconBlockHeaders(request) => self.sync.on_beacon_block_headers_request( peer_id, + request_id, request, &mut self.network_context, ), RPCRequest::BeaconBlockBodies(request) => self.sync.on_beacon_block_bodies_request( peer_id, + request_id, request, &mut self.network_context, ), @@ -143,17 +149,23 @@ impl MessageHandler { /// An RPC response has been received from the network. // we match on id and ignore responses past the timeout. - fn handle_rpc_response(&mut self, peer_id: PeerId, id: u64, response: RPCResponse) { - // if response id is related to a request, ignore (likely RPC timeout) + fn handle_rpc_response(&mut self, peer_id: PeerId, id: RequestId, response: RPCResponse) { + // if response id is not related to a request, ignore (likely RPC timeout) if self .network_context - .requests - .remove(&(peer_id.clone(), id)) + .outstanding_outgoing_request_ids + .remove(&(peer_id.clone(), id.clone())) .is_none() { - debug!(self.log, "Unrecognised response from peer: {:?}", peer_id); + warn!( + self.log, + "Unknown ResponseId for incoming RPCRequest"; + "peer" => format!("{:?}", peer_id), + "request_id" => format!("{:?}", id) + ); return; } + match response { RPCResponse::Hello(hello_message) => { self.sync @@ -210,9 +222,9 @@ pub struct NetworkContext { /// The network channel to relay messages to the Network service. network_send: crossbeam_channel::Sender, /// A mapping of peers and the RPC id we have sent an RPC request to. - requests: HashMap<(PeerId, u64), Instant>, - /// A counter of request id for each peer. - request_ids: HashMap, + outstanding_outgoing_request_ids: HashMap<(PeerId, RequestId), Instant>, + /// Stores the next `RequestId` we should include on an outgoing `RPCRequest` to a `PeerId`. + outgoing_request_ids: HashMap, /// The `MessageHandler` logger. log: slog::Logger, } @@ -221,8 +233,8 @@ impl NetworkContext { pub fn new(network_send: crossbeam_channel::Sender, log: slog::Logger) -> Self { Self { network_send, - requests: HashMap::new(), - request_ids: HashMap::new(), + outstanding_outgoing_request_ids: HashMap::new(), + outgoing_request_ids: HashMap::new(), log, } } @@ -234,6 +246,10 @@ impl NetworkContext { pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) { let id = self.generate_request_id(&peer_id); + + self.outstanding_outgoing_request_ids + .insert((peer_id.clone(), id.clone()), Instant::now()); + self.send_rpc_event( peer_id, RPCEvent::Request { @@ -244,12 +260,16 @@ impl NetworkContext { ); } - pub fn send_rpc_response(&mut self, peer_id: PeerId, rpc_response: RPCResponse) { - let id = self.generate_request_id(&peer_id); + pub fn send_rpc_response( + &mut self, + peer_id: PeerId, + request_id: RequestId, + rpc_response: RPCResponse, + ) { self.send_rpc_event( peer_id, RPCEvent::Response { - id, + id: request_id, method_id: rpc_response.method_id(), result: rpc_response, }, @@ -272,18 +292,14 @@ impl NetworkContext { // } - /// Generates a new request id for a peer. - fn generate_request_id(&mut self, peer_id: &PeerId) -> u64 { - // generate a unique id for the peer - let id = { - let borrowed_id = self.request_ids.entry(peer_id.clone()).or_insert_with(|| 0); - let id = borrowed_id.clone(); - //increment the counter - *borrowed_id += 1; - id - }; - // register RPC request - self.requests.insert((peer_id.clone(), id), Instant::now()); - id + /// Returns the next `RequestId` for sending an `RPCRequest` to the `peer_id`. + fn generate_request_id(&mut self, peer_id: &PeerId) -> RequestId { + let next_id = self + .outgoing_request_ids + .entry(peer_id.clone()) + .and_modify(|id| id.increment()) + .or_insert_with(|| RequestId::from(1)); + + next_id.previous() } } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 05c1a0430..37c2c4c26 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -2,7 +2,7 @@ use super::import_queue::ImportQueue; use crate::beacon_chain::BeaconChain; use crate::message_handler::NetworkContext; use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::{RPCRequest, RPCResponse}; +use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; use slog::{debug, error, info, o, warn}; use std::collections::HashMap; @@ -157,6 +157,7 @@ impl SimpleSync { pub fn on_hello_request( &mut self, peer_id: PeerId, + request_id: RequestId, hello: HelloMessage, network: &mut NetworkContext, ) { @@ -165,6 +166,7 @@ impl SimpleSync { // Say hello back. network.send_rpc_response( peer_id.clone(), + request_id, RPCResponse::Hello(self.chain.hello_message()), ); @@ -256,7 +258,7 @@ impl SimpleSync { network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork); } - // If required, send requests for blocks. + // If required, send additional requests. match remote_status { PeerStatus::HigherFinalizedEpoch => { let start_slot = remote @@ -295,6 +297,7 @@ impl SimpleSync { pub fn on_beacon_block_roots_request( &mut self, peer_id: PeerId, + request_id: RequestId, req: BeaconBlockRootsRequest, network: &mut NetworkContext, ) { @@ -333,6 +336,7 @@ impl SimpleSync { network.send_rpc_response( peer_id, + request_id, RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse { roots }), ) } @@ -385,6 +389,7 @@ impl SimpleSync { pub fn on_beacon_block_headers_request( &mut self, peer_id: PeerId, + request_id: RequestId, req: BeaconBlockHeadersRequest, network: &mut NetworkContext, ) { @@ -415,6 +420,7 @@ impl SimpleSync { network.send_rpc_response( peer_id, + request_id, RPCResponse::BeaconBlockHeaders(BeaconBlockHeadersResponse { headers }), ) } @@ -454,6 +460,7 @@ impl SimpleSync { pub fn on_beacon_block_bodies_request( &mut self, peer_id: PeerId, + request_id: RequestId, req: BeaconBlockBodiesRequest, network: &mut NetworkContext, ) { @@ -480,6 +487,7 @@ impl SimpleSync { network.send_rpc_response( peer_id, + request_id, RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { block_bodies }), ) } diff --git a/beacon_node/network/tests/tests.rs b/beacon_node/network/tests/tests.rs index 110450dc9..9cead1b55 100644 --- a/beacon_node/network/tests/tests.rs +++ b/beacon_node/network/tests/tests.rs @@ -1,6 +1,6 @@ use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}; use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::{RPCMethod, RPCRequest, RPCResponse}; +use eth2_libp2p::rpc::{RPCMethod, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::{PeerId, RPCEvent}; use network::beacon_chain::BeaconChain as NetworkBeaconChain; use network::message_handler::{HandlerMessage, MessageHandler}; @@ -82,8 +82,8 @@ impl SyncNode { let network_message = self.recv().expect("Timeout on tee"); let handler_message = match network_message.clone() { - NetworkMessage::Send(peer_id, OutgoingMessage::RPC(event)) => { - HandlerMessage::RPC(peer_id, event) + NetworkMessage::Send(_to_peer_id, OutgoingMessage::RPC(event)) => { + HandlerMessage::RPC(self.peer_id.clone(), event) } _ => panic!("tee cannot parse {:?}", network_message), }; @@ -265,7 +265,7 @@ fn get_logger() -> slog::Logger { pub struct SyncMaster { harness: BeaconChainHarness, peer_id: PeerId, - response_ids: Vec, + response_ids: Vec, } impl SyncMaster { @@ -276,7 +276,7 @@ impl SyncMaster { ) -> Self { let harness = BeaconChainHarness::from_beacon_state_builder(state_builder, spec.clone()); let peer_id = PeerId::random(); - let response_ids = vec![0; node_count]; + let response_ids = vec![RequestId::from(0); node_count]; Self { harness, @@ -285,9 +285,9 @@ impl SyncMaster { } } - pub fn response_id(&mut self, node: &SyncNode) -> u64 { - let id = self.response_ids[node.id]; - self.response_ids[node.id] += 1; + pub fn response_id(&mut self, node: &SyncNode) -> RequestId { + let id = self.response_ids[node.id].clone(); + self.response_ids[node.id].increment(); id }