2019-03-18 06:38:23 +00:00
|
|
|
use crate::beacon_chain::BeaconChain;
|
2019-03-04 07:31:01 +00:00
|
|
|
use crate::error;
|
|
|
|
use crate::messages::NodeMessage;
|
2019-03-19 01:19:07 +00:00
|
|
|
use crate::service::{NetworkMessage, OutgoingMessage};
|
2019-03-18 13:05:06 +00:00
|
|
|
use crate::sync::SimpleSync;
|
2019-03-18 06:38:23 +00:00
|
|
|
use crossbeam_channel::{unbounded as channel, Sender};
|
2019-03-17 10:49:56 +00:00
|
|
|
use futures::future;
|
|
|
|
use futures::prelude::*;
|
2019-03-19 00:25:42 +00:00
|
|
|
use libp2p::{
|
2019-03-19 01:19:07 +00:00
|
|
|
rpc::{RPCMethod, RPCRequest, RPCResponse},
|
2019-03-19 02:03:12 +00:00
|
|
|
HelloMessage, PeerId, RPCEvent,
|
2019-03-19 00:25:42 +00:00
|
|
|
};
|
2019-03-19 01:19:07 +00:00
|
|
|
use slog::warn;
|
2019-03-19 01:47:36 +00:00
|
|
|
use slog::{debug, trace};
|
2019-03-17 12:14:28 +00:00
|
|
|
use std::collections::HashMap;
|
2019-03-18 06:38:23 +00:00
|
|
|
use std::sync::Arc;
|
2019-03-17 12:14:28 +00:00
|
|
|
use std::time::{Duration, Instant};
|
2019-03-04 07:31:01 +00:00
|
|
|
use types::Hash256;
|
2019-03-04 05:39:37 +00:00
|
|
|
|
2019-03-19 00:25:42 +00:00
|
|
|
/// Timeout for RPC requests.
|
|
|
|
const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
|
2019-03-17 12:14:28 +00:00
|
|
|
|
2019-03-04 05:39:37 +00:00
|
|
|
/// Handles messages received from the network and client and organises syncing.
|
2019-03-18 06:38:23 +00:00
|
|
|
pub struct MessageHandler {
|
2019-03-18 05:16:54 +00:00
|
|
|
/// Currently loaded and initialised beacon chain.
|
2019-03-18 06:38:23 +00:00
|
|
|
chain: Arc<BeaconChain>,
|
2019-03-18 05:16:54 +00:00
|
|
|
/// The syncing framework.
|
2019-03-04 07:31:01 +00:00
|
|
|
sync: SimpleSync,
|
2019-03-18 12:18:25 +00:00
|
|
|
/// The network channel to relay messages to the Network service.
|
|
|
|
network_send: crossbeam_channel::Sender<NetworkMessage>,
|
2019-03-19 00:25:42 +00:00
|
|
|
/// A mapping of peers we have sent an RPC request to.
|
|
|
|
requests: HashMap<PeerId, Vec<RPCRequestInfo>>,
|
|
|
|
/// A counter of request id for each peer.
|
2019-03-19 01:19:07 +00:00
|
|
|
request_ids: HashMap<PeerId, u64>,
|
2019-03-18 05:16:54 +00:00
|
|
|
/// The `MessageHandler` logger.
|
2019-03-17 10:49:56 +00:00
|
|
|
log: slog::Logger,
|
2019-03-04 05:39:37 +00:00
|
|
|
}
|
|
|
|
|
2019-03-19 00:25:42 +00:00
|
|
|
/// RPC request information
|
|
|
|
pub struct RPCRequestInfo {
|
|
|
|
/// The id of the request
|
2019-03-19 01:19:07 +00:00
|
|
|
id: u64,
|
2019-03-19 00:25:42 +00:00
|
|
|
/// The time the request was sent, to check ttl.
|
|
|
|
request_time: Instant,
|
|
|
|
}
|
|
|
|
|
2019-03-04 05:39:37 +00:00
|
|
|
/// Types of messages the handler can receive.
|
2019-03-17 10:49:56 +00:00
|
|
|
#[derive(Debug, Clone)]
|
2019-03-04 05:39:37 +00:00
|
|
|
pub enum HandlerMessage {
|
2019-03-17 12:14:28 +00:00
|
|
|
/// We have initiated a connection to a new peer.
|
|
|
|
PeerDialed(PeerId),
|
2019-03-04 05:39:37 +00:00
|
|
|
/// Peer has disconnected,
|
|
|
|
PeerDisconnected(PeerId),
|
|
|
|
/// A Node message has been received.
|
2019-03-04 07:31:01 +00:00
|
|
|
Message(PeerId, NodeMessage),
|
2019-03-17 10:49:56 +00:00
|
|
|
/// An RPC response/request has been received.
|
2019-03-19 01:47:36 +00:00
|
|
|
RPC(PeerId, RPCEvent),
|
2019-03-04 07:31:01 +00:00
|
|
|
}
|
|
|
|
|
2019-03-18 06:38:23 +00:00
|
|
|
impl MessageHandler {
|
2019-03-04 07:31:01 +00:00
|
|
|
/// Initializes and runs the MessageHandler.
|
2019-03-17 10:49:56 +00:00
|
|
|
pub fn new(
|
2019-03-18 06:38:23 +00:00
|
|
|
beacon_chain: Arc<BeaconChain>,
|
2019-03-18 12:18:25 +00:00
|
|
|
network_send: crossbeam_channel::Sender<NetworkMessage>,
|
2019-03-17 10:49:56 +00:00
|
|
|
executor: &tokio::runtime::TaskExecutor,
|
|
|
|
log: slog::Logger,
|
|
|
|
) -> error::Result<Sender<HandlerMessage>> {
|
2019-03-04 07:31:01 +00:00
|
|
|
debug!(log, "Service starting");
|
|
|
|
|
|
|
|
let (handler_send, handler_recv) = channel();
|
|
|
|
|
|
|
|
// Initialise sync and begin processing in thread
|
2019-03-17 10:49:56 +00:00
|
|
|
// generate the Message handler
|
2019-03-18 13:05:06 +00:00
|
|
|
let sync = SimpleSync::new(beacon_chain.clone());
|
|
|
|
|
2019-03-17 10:49:56 +00:00
|
|
|
let mut handler = MessageHandler {
|
2019-03-18 13:05:06 +00:00
|
|
|
// TODO: The handler may not need a chain, perhaps only sync?
|
2019-03-18 06:38:23 +00:00
|
|
|
chain: beacon_chain.clone(),
|
2019-03-17 10:49:56 +00:00
|
|
|
sync,
|
2019-03-18 12:18:25 +00:00
|
|
|
network_send,
|
2019-03-19 00:25:42 +00:00
|
|
|
requests: HashMap::new(),
|
|
|
|
request_ids: HashMap::new(),
|
2019-03-17 10:49:56 +00:00
|
|
|
log: log.clone(),
|
|
|
|
};
|
2019-03-04 07:31:01 +00:00
|
|
|
|
2019-03-17 10:49:56 +00:00
|
|
|
// spawn handler task
|
|
|
|
// TODO: Handle manual termination of thread
|
|
|
|
executor.spawn(future::poll_fn(move || -> Result<_, _> {
|
|
|
|
loop {
|
|
|
|
handler.handle_message(handler_recv.recv().map_err(|_| {
|
2019-03-18 07:22:01 +00:00
|
|
|
debug!(log, "Network message handler terminated.");
|
2019-03-17 10:49:56 +00:00
|
|
|
})?);
|
|
|
|
}
|
|
|
|
}));
|
2019-03-04 07:31:01 +00:00
|
|
|
|
|
|
|
Ok(handler_send)
|
|
|
|
}
|
2019-03-17 10:49:56 +00:00
|
|
|
|
|
|
|
fn handle_message(&mut self, message: HandlerMessage) {
|
2019-03-18 05:16:54 +00:00
|
|
|
match message {
|
2019-03-19 01:47:36 +00:00
|
|
|
// we have initiated a connection to a peer
|
2019-03-18 12:18:25 +00:00
|
|
|
HandlerMessage::PeerDialed(peer_id) => {
|
2019-03-19 01:19:07 +00:00
|
|
|
self.send_hello_request(peer_id);
|
2019-03-18 12:18:25 +00:00
|
|
|
}
|
2019-03-19 01:47:36 +00:00
|
|
|
// we have received an RPC message request/response
|
|
|
|
HandlerMessage::RPC(peer_id, rpc_event) => {
|
|
|
|
self.handle_rpc_message(peer_id, rpc_event);
|
|
|
|
}
|
2019-03-18 05:16:54 +00:00
|
|
|
//TODO: Handle all messages
|
|
|
|
_ => {}
|
|
|
|
}
|
2019-03-17 10:49:56 +00:00
|
|
|
}
|
2019-03-18 05:16:54 +00:00
|
|
|
|
2019-03-19 01:47:36 +00:00
|
|
|
fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) {
|
|
|
|
match rpc_message {
|
|
|
|
RPCEvent::Request {
|
|
|
|
id,
|
2019-03-19 02:03:12 +00:00
|
|
|
method_id: _, // TODO: Clean up RPC Message types, have a cleaner type by this point.
|
2019-03-19 01:47:36 +00:00
|
|
|
body,
|
|
|
|
} => self.handle_rpc_request(peer_id, id, body),
|
|
|
|
RPCEvent::Response {
|
|
|
|
id,
|
|
|
|
method_id: _,
|
|
|
|
result,
|
|
|
|
} => self.handle_rpc_response(peer_id, id, result),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-19 02:03:12 +00:00
|
|
|
/// A new RPC request has been received from the network.
|
|
|
|
fn handle_rpc_request(&mut self, peer_id: PeerId, id: u64, request: RPCRequest) {
|
|
|
|
match request {
|
|
|
|
RPCRequest::Hello(hello_message) => {
|
|
|
|
self.handle_hello_response(peer_id, id, hello_message)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-03-19 01:47:36 +00:00
|
|
|
|
2019-03-19 02:03:12 +00:00
|
|
|
/// An RPC response has been received from the network.
|
2019-03-19 01:47:36 +00:00
|
|
|
// we match on id and ignore responses past the timeout.
|
|
|
|
fn handle_rpc_response(&mut self, peer_id: PeerId, id: u64, response: RPCResponse) {}
|
|
|
|
|
2019-03-19 02:03:12 +00:00
|
|
|
fn handle_hello_response(&mut self, peer_id: PeerId, id: u64, response: HelloMessage) {
|
|
|
|
/*
|
|
|
|
// if response id is not in our list, ignore (likely RPC timeout)
|
|
|
|
match self.requests.get(peer_id) {
|
|
|
|
None => return;
|
|
|
|
Some(rpc_info) => {
|
|
|
|
if rpc_info.con
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2019-03-18 05:16:54 +00:00
|
|
|
/// Sends a HELLO RPC request to a newly connected peer.
|
2019-03-19 01:19:07 +00:00
|
|
|
fn send_hello_request(&mut self, peer_id: PeerId) {
|
2019-03-19 00:25:42 +00:00
|
|
|
// generate a unique id for the peer
|
|
|
|
let id = {
|
|
|
|
let borrowed_id = self.request_ids.entry(peer_id.clone()).or_insert_with(|| 0);
|
|
|
|
let id = borrowed_id.clone();
|
|
|
|
//increment the counter
|
|
|
|
*borrowed_id += 1;
|
|
|
|
id
|
|
|
|
};
|
|
|
|
// register RPC request
|
|
|
|
{
|
2019-03-19 01:19:07 +00:00
|
|
|
let requests = self
|
|
|
|
.requests
|
|
|
|
.entry(peer_id.clone())
|
|
|
|
.or_insert_with(|| vec![]);
|
2019-03-19 00:25:42 +00:00
|
|
|
requests.push(RPCRequestInfo {
|
|
|
|
id: id.clone(),
|
|
|
|
request_time: Instant::now(),
|
|
|
|
});
|
|
|
|
}
|
2019-03-19 01:19:07 +00:00
|
|
|
|
|
|
|
// build the rpc request
|
|
|
|
let rpc_event = RPCEvent::Request {
|
|
|
|
id,
|
|
|
|
method_id: RPCMethod::Hello.into(),
|
|
|
|
body: RPCRequest::Hello(self.sync.generate_hello()),
|
|
|
|
};
|
|
|
|
|
2019-03-18 12:18:25 +00:00
|
|
|
// send the hello request to the network
|
2019-03-19 01:47:36 +00:00
|
|
|
trace!(self.log, "Sending HELLO message to peer {:?}", peer_id);
|
2019-03-19 01:19:07 +00:00
|
|
|
self.send_rpc(peer_id, rpc_event);
|
2019-03-18 12:18:25 +00:00
|
|
|
}
|
2019-03-19 00:25:42 +00:00
|
|
|
|
2019-03-19 01:47:36 +00:00
|
|
|
/// Sends an RPC request/response to the network server.
|
2019-03-19 01:19:07 +00:00
|
|
|
fn send_rpc(&self, peer_id: PeerId, rpc_event: RPCEvent) {
|
|
|
|
self.network_send
|
|
|
|
.send(NetworkMessage::Send(
|
|
|
|
peer_id,
|
|
|
|
OutgoingMessage::RPC(rpc_event),
|
|
|
|
))
|
|
|
|
.unwrap_or_else(|_| {
|
|
|
|
warn!(
|
|
|
|
self.log,
|
|
|
|
"Could not send RPC message to the network service"
|
|
|
|
)
|
|
|
|
});
|
|
|
|
}
|
2019-03-04 05:39:37 +00:00
|
|
|
}
|