Introduced RequestId newtype

This commit is contained in:
Paul Hauner 2019-03-25 16:48:44 +11:00
parent ebb9ced0a4
commit 32a025bdf7
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
5 changed files with 125 additions and 51 deletions

View File

@ -12,7 +12,7 @@ use libp2p::core::swarm::{
};
use libp2p::{Multiaddr, PeerId};
pub use methods::{HelloMessage, IncomingGossip, RPCMethod, RPCRequest, RPCResponse};
pub use protocol::{RPCEvent, RPCProtocol};
pub use protocol::{RPCEvent, RPCProtocol, RequestId};
use slog::o;
use std::marker::PhantomData;
use tokio::io::{AsyncRead, AsyncWrite};

View File

@ -1,6 +1,7 @@
use super::methods::*;
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use ssz::{ssz_encode, Decodable, Encodable, SszStream};
use ssz::{ssz_encode, Decodable, DecodeError as SSZDecodeError, Encodable, SszStream};
use std::hash::{Hash, Hasher};
use std::io;
use std::iter;
use tokio::io::{AsyncRead, AsyncWrite};
@ -29,16 +30,65 @@ impl Default for RPCProtocol {
}
}
/// A monotonic counter for ordering `RPCRequest`s.
#[derive(Debug, Clone, PartialEq, Default)]
pub struct RequestId(u64);
impl RequestId {
/// Increment the request id.
pub fn increment(&mut self) {
self.0 += 1
}
/// Return the previous id.
pub fn previous(&self) -> Self {
Self(self.0 - 1)
}
}
impl Eq for RequestId {}
impl Hash for RequestId {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.hash(state);
}
}
impl From<u64> for RequestId {
fn from(x: u64) -> RequestId {
RequestId(x)
}
}
impl Into<u64> for RequestId {
fn into(self) -> u64 {
self.0
}
}
impl Encodable for RequestId {
fn ssz_append(&self, s: &mut SszStream) {
self.0.ssz_append(s);
}
}
impl Decodable for RequestId {
fn ssz_decode(bytes: &[u8], index: usize) -> Result<(Self, usize), SSZDecodeError> {
let (id, index) = u64::ssz_decode(bytes, index)?;
Ok((Self::from(id), index))
}
}
/// The RPC types which are sent/received in this protocol.
#[derive(Debug, Clone)]
pub enum RPCEvent {
Request {
id: u64,
id: RequestId,
method_id: u16,
body: RPCRequest,
},
Response {
id: u64,
id: RequestId,
method_id: u16, //TODO: Remove and process decoding upstream
result: RPCResponse,
},
@ -72,7 +122,7 @@ fn decode(packet: Vec<u8>) -> Result<RPCEvent, DecodeError> {
// decode the header of the rpc
// request/response
let (request, index) = bool::ssz_decode(&packet, 0)?;
let (id, index) = u64::ssz_decode(&packet, index)?;
let (id, index) = RequestId::ssz_decode(&packet, index)?;
let (method_id, index) = u16::ssz_decode(&packet, index)?;
if request {

View File

@ -4,7 +4,7 @@ use crate::service::{NetworkMessage, OutgoingMessage};
use crate::sync::SimpleSync;
use crossbeam_channel::{unbounded as channel, Sender};
use eth2_libp2p::{
rpc::{methods::GoodbyeReason, IncomingGossip, RPCRequest, RPCResponse},
rpc::{methods::GoodbyeReason, IncomingGossip, RPCRequest, RPCResponse, RequestId},
PeerId, RPCEvent,
};
use futures::future;
@ -111,25 +111,31 @@ impl MessageHandler {
}
/// A new RPC request has been received from the network.
fn handle_rpc_request(&mut self, peer_id: PeerId, _id: u64, request: RPCRequest) {
fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) {
// TODO: process the `id`.
match request {
RPCRequest::Hello(hello_message) => {
self.sync
.on_hello_request(peer_id, hello_message, &mut self.network_context)
}
RPCRequest::Hello(hello_message) => self.sync.on_hello_request(
peer_id,
request_id,
hello_message,
&mut self.network_context,
),
RPCRequest::Goodbye(goodbye_reason) => self.sync.on_goodbye(peer_id, goodbye_reason),
RPCRequest::BeaconBlockRoots(request) => {
self.sync
.on_beacon_block_roots_request(peer_id, request, &mut self.network_context)
}
RPCRequest::BeaconBlockRoots(request) => self.sync.on_beacon_block_roots_request(
peer_id,
request_id,
request,
&mut self.network_context,
),
RPCRequest::BeaconBlockHeaders(request) => self.sync.on_beacon_block_headers_request(
peer_id,
request_id,
request,
&mut self.network_context,
),
RPCRequest::BeaconBlockBodies(request) => self.sync.on_beacon_block_bodies_request(
peer_id,
request_id,
request,
&mut self.network_context,
),
@ -143,17 +149,23 @@ impl MessageHandler {
/// An RPC response has been received from the network.
// we match on id and ignore responses past the timeout.
fn handle_rpc_response(&mut self, peer_id: PeerId, id: u64, response: RPCResponse) {
// if response id is related to a request, ignore (likely RPC timeout)
fn handle_rpc_response(&mut self, peer_id: PeerId, id: RequestId, response: RPCResponse) {
// if response id is not related to a request, ignore (likely RPC timeout)
if self
.network_context
.requests
.remove(&(peer_id.clone(), id))
.outstanding_outgoing_request_ids
.remove(&(peer_id.clone(), id.clone()))
.is_none()
{
debug!(self.log, "Unrecognised response from peer: {:?}", peer_id);
warn!(
self.log,
"Unknown ResponseId for incoming RPCRequest";
"peer" => format!("{:?}", peer_id),
"request_id" => format!("{:?}", id)
);
return;
}
match response {
RPCResponse::Hello(hello_message) => {
self.sync
@ -210,9 +222,9 @@ pub struct NetworkContext {
/// The network channel to relay messages to the Network service.
network_send: crossbeam_channel::Sender<NetworkMessage>,
/// A mapping of peers and the RPC id we have sent an RPC request to.
requests: HashMap<(PeerId, u64), Instant>,
/// A counter of request id for each peer.
request_ids: HashMap<PeerId, u64>,
outstanding_outgoing_request_ids: HashMap<(PeerId, RequestId), Instant>,
/// Stores the next `RequestId` we should include on an outgoing `RPCRequest` to a `PeerId`.
outgoing_request_ids: HashMap<PeerId, RequestId>,
/// The `MessageHandler` logger.
log: slog::Logger,
}
@ -221,8 +233,8 @@ impl NetworkContext {
pub fn new(network_send: crossbeam_channel::Sender<NetworkMessage>, log: slog::Logger) -> Self {
Self {
network_send,
requests: HashMap::new(),
request_ids: HashMap::new(),
outstanding_outgoing_request_ids: HashMap::new(),
outgoing_request_ids: HashMap::new(),
log,
}
}
@ -234,6 +246,10 @@ impl NetworkContext {
pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) {
let id = self.generate_request_id(&peer_id);
self.outstanding_outgoing_request_ids
.insert((peer_id.clone(), id.clone()), Instant::now());
self.send_rpc_event(
peer_id,
RPCEvent::Request {
@ -244,12 +260,16 @@ impl NetworkContext {
);
}
pub fn send_rpc_response(&mut self, peer_id: PeerId, rpc_response: RPCResponse) {
let id = self.generate_request_id(&peer_id);
pub fn send_rpc_response(
&mut self,
peer_id: PeerId,
request_id: RequestId,
rpc_response: RPCResponse,
) {
self.send_rpc_event(
peer_id,
RPCEvent::Response {
id,
id: request_id,
method_id: rpc_response.method_id(),
result: rpc_response,
},
@ -272,18 +292,14 @@ impl NetworkContext {
//
}
/// Generates a new request id for a peer.
fn generate_request_id(&mut self, peer_id: &PeerId) -> u64 {
// generate a unique id for the peer
let id = {
let borrowed_id = self.request_ids.entry(peer_id.clone()).or_insert_with(|| 0);
let id = borrowed_id.clone();
//increment the counter
*borrowed_id += 1;
id
};
// register RPC request
self.requests.insert((peer_id.clone(), id), Instant::now());
id
/// Returns the next `RequestId` for sending an `RPCRequest` to the `peer_id`.
fn generate_request_id(&mut self, peer_id: &PeerId) -> RequestId {
let next_id = self
.outgoing_request_ids
.entry(peer_id.clone())
.and_modify(|id| id.increment())
.or_insert_with(|| RequestId::from(1));
next_id.previous()
}
}

View File

@ -2,7 +2,7 @@ use super::import_queue::ImportQueue;
use crate::beacon_chain::BeaconChain;
use crate::message_handler::NetworkContext;
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCRequest, RPCResponse};
use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::PeerId;
use slog::{debug, error, info, o, warn};
use std::collections::HashMap;
@ -157,6 +157,7 @@ impl SimpleSync {
pub fn on_hello_request(
&mut self,
peer_id: PeerId,
request_id: RequestId,
hello: HelloMessage,
network: &mut NetworkContext,
) {
@ -165,6 +166,7 @@ impl SimpleSync {
// Say hello back.
network.send_rpc_response(
peer_id.clone(),
request_id,
RPCResponse::Hello(self.chain.hello_message()),
);
@ -256,7 +258,7 @@ impl SimpleSync {
network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork);
}
// If required, send requests for blocks.
// If required, send additional requests.
match remote_status {
PeerStatus::HigherFinalizedEpoch => {
let start_slot = remote
@ -295,6 +297,7 @@ impl SimpleSync {
pub fn on_beacon_block_roots_request(
&mut self,
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockRootsRequest,
network: &mut NetworkContext,
) {
@ -333,6 +336,7 @@ impl SimpleSync {
network.send_rpc_response(
peer_id,
request_id,
RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse { roots }),
)
}
@ -385,6 +389,7 @@ impl SimpleSync {
pub fn on_beacon_block_headers_request(
&mut self,
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockHeadersRequest,
network: &mut NetworkContext,
) {
@ -415,6 +420,7 @@ impl SimpleSync {
network.send_rpc_response(
peer_id,
request_id,
RPCResponse::BeaconBlockHeaders(BeaconBlockHeadersResponse { headers }),
)
}
@ -454,6 +460,7 @@ impl SimpleSync {
pub fn on_beacon_block_bodies_request(
&mut self,
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockBodiesRequest,
network: &mut NetworkContext,
) {
@ -480,6 +487,7 @@ impl SimpleSync {
network.send_rpc_response(
peer_id,
request_id,
RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { block_bodies }),
)
}

View File

@ -1,6 +1,6 @@
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCMethod, RPCRequest, RPCResponse};
use eth2_libp2p::rpc::{RPCMethod, RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::{PeerId, RPCEvent};
use network::beacon_chain::BeaconChain as NetworkBeaconChain;
use network::message_handler::{HandlerMessage, MessageHandler};
@ -82,8 +82,8 @@ impl SyncNode {
let network_message = self.recv().expect("Timeout on tee");
let handler_message = match network_message.clone() {
NetworkMessage::Send(peer_id, OutgoingMessage::RPC(event)) => {
HandlerMessage::RPC(peer_id, event)
NetworkMessage::Send(_to_peer_id, OutgoingMessage::RPC(event)) => {
HandlerMessage::RPC(self.peer_id.clone(), event)
}
_ => panic!("tee cannot parse {:?}", network_message),
};
@ -265,7 +265,7 @@ fn get_logger() -> slog::Logger {
pub struct SyncMaster {
harness: BeaconChainHarness,
peer_id: PeerId,
response_ids: Vec<u64>,
response_ids: Vec<RequestId>,
}
impl SyncMaster {
@ -276,7 +276,7 @@ impl SyncMaster {
) -> Self {
let harness = BeaconChainHarness::from_beacon_state_builder(state_builder, spec.clone());
let peer_id = PeerId::random();
let response_ids = vec![0; node_count];
let response_ids = vec![RequestId::from(0); node_count];
Self {
harness,
@ -285,9 +285,9 @@ impl SyncMaster {
}
}
pub fn response_id(&mut self, node: &SyncNode) -> u64 {
let id = self.response_ids[node.id];
self.response_ids[node.id] += 1;
pub fn response_id(&mut self, node: &SyncNode) -> RequestId {
let id = self.response_ids[node.id].clone();
self.response_ids[node.id].increment();
id
}