use crate::beacon_chain::BeaconChain; use crate::error; use crate::messages::NodeMessage; use crate::service::NetworkMessage; use crate::sync::SimpleSync; use crossbeam_channel::{unbounded as channel, Sender}; use futures::future; use futures::prelude::*; use libp2p::{ rpc::{RPCRequest, RPCResponse}, PeerId, RPCEvent, }; use slog::debug; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use types::Hash256; /// Timeout for RPC requests. const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { /// Currently loaded and initialised beacon chain. chain: Arc, /// The syncing framework. sync: SimpleSync, /// The network channel to relay messages to the Network service. network_send: crossbeam_channel::Sender, /// A mapping of peers we have sent an RPC request to. requests: HashMap>, /// A counter of request id for each peer. request_ids: HashMap, /// The `MessageHandler` logger. log: slog::Logger, } /// RPC request information pub struct RPCRequestInfo { /// The id of the request id: u16, /// The time the request was sent, to check ttl. request_time: Instant, } /// Types of messages the handler can receive. #[derive(Debug, Clone)] pub enum HandlerMessage { /// We have initiated a connection to a new peer. PeerDialed(PeerId), /// Peer has disconnected, PeerDisconnected(PeerId), /// A Node message has been received. Message(PeerId, NodeMessage), /// An RPC response/request has been received. RPC(RPCEvent), } impl MessageHandler { /// Initializes and runs the MessageHandler. pub fn new( beacon_chain: Arc, network_send: crossbeam_channel::Sender, executor: &tokio::runtime::TaskExecutor, log: slog::Logger, ) -> error::Result> { debug!(log, "Service starting"); let (handler_send, handler_recv) = channel(); // Initialise sync and begin processing in thread // generate the Message handler let sync = SimpleSync::new(beacon_chain.clone()); let mut handler = MessageHandler { // TODO: The handler may not need a chain, perhaps only sync? chain: beacon_chain.clone(), sync, network_send, requests: HashMap::new(), request_ids: HashMap::new(), log: log.clone(), }; // spawn handler task // TODO: Handle manual termination of thread executor.spawn(future::poll_fn(move || -> Result<_, _> { loop { handler.handle_message(handler_recv.recv().map_err(|_| { debug!(log, "Network message handler terminated."); })?); } })); Ok(handler_send) } fn handle_message(&mut self, message: HandlerMessage) { match message { HandlerMessage::PeerDialed(peer_id) => { self.send_hello(peer_id); } //TODO: Handle all messages _ => {} } } /// Sends a HELLO RPC request to a newly connected peer. fn send_hello(&mut self, peer_id: PeerId) { // generate a unique id for the peer let id = { let borrowed_id = self.request_ids.entry(peer_id.clone()).or_insert_with(|| 0); let id = borrowed_id.clone(); //increment the counter *borrowed_id += 1; id }; // register RPC request { let requests = self.requests.entry(peer_id).or_insert_with(|| vec![]); requests.push(RPCRequestInfo { id: id.clone(), request_time: Instant::now(), }); } // send the hello request to the network self.send_rpc_request(id, RPCResponse::Hello(self.sync.generate_hello())); } /// Sends and RPC response fn send_rpc_request(&self, request_id: u16, response: RPCResponse) {} }