use crate::error; use crate::service::{NetworkMessage, OutgoingMessage}; use crate::sync::SimpleSync; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::{ behaviour::PubsubMessage, rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId}, PeerId, RPCEvent, }; use futures::future::Future; use futures::stream::Stream; use slog::{debug, warn}; use ssz::{Decode, DecodeError}; use std::sync::Arc; use tokio::sync::mpsc; use types::{BeaconBlockHeader, EthSpec}; /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { /// Currently loaded and initialised beacon chain. _chain: Arc>, /// The syncing framework. sync: SimpleSync, /// The context required to send messages to, and process messages from peers. network_context: NetworkContext, /// The `MessageHandler` logger. log: slog::Logger, } /// Types of messages the handler can receive. #[derive(Debug)] pub enum HandlerMessage { /// We have initiated a connection to a new peer. PeerDialed(PeerId), /// Peer has disconnected, PeerDisconnected(PeerId), /// An RPC response/request has been received. RPC(PeerId, RPCEvent), /// A gossip message has been received. PubsubMessage(PeerId, Box>), } impl MessageHandler { /// Initializes and runs the MessageHandler. pub fn spawn( beacon_chain: Arc>, network_send: mpsc::UnboundedSender>, executor: &tokio::runtime::TaskExecutor, log: slog::Logger, ) -> error::Result>> { debug!(log, "Service starting"); let (handler_send, handler_recv) = mpsc::unbounded_channel(); // Initialise sync and begin processing in thread // generate the Message handler let sync = SimpleSync::new(beacon_chain.clone(), &log); let mut handler = MessageHandler { _chain: beacon_chain.clone(), sync, network_context: NetworkContext::new(network_send, log.clone()), log: log.clone(), }; // spawn handler task // TODO: Handle manual termination of thread executor.spawn( handler_recv .for_each(move |msg| Ok(handler.handle_message(msg))) .map_err(move |_| { debug!(log, "Network message handler terminated."); }), ); Ok(handler_send) } /// Handle all messages incoming from the network service. fn handle_message(&mut self, message: HandlerMessage) { match message { // we have initiated a connection to a peer HandlerMessage::PeerDialed(peer_id) => { self.sync.on_connect(peer_id, &mut self.network_context); } // A peer has disconnected HandlerMessage::PeerDisconnected(peer_id) => { self.sync.on_disconnect(peer_id); } // we have received an RPC message request/response HandlerMessage::RPC(peer_id, rpc_event) => { self.handle_rpc_message(peer_id, rpc_event); } // we have received an RPC message request/response HandlerMessage::PubsubMessage(peer_id, gossip) => { self.handle_gossip(peer_id, *gossip); } } } /* RPC - Related functionality */ /// Handle RPC messages fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) { match rpc_message { RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req), RPCEvent::Response(_id, resp) => self.handle_rpc_response(peer_id, resp), RPCEvent::Error(id, error) => self.handle_rpc_error(peer_id, id, error), } } /// A new RPC request has been received from the network. fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) { match request { RPCRequest::Hello(hello_message) => self.sync.on_hello_request( peer_id, request_id, hello_message, &mut self.network_context, ), RPCRequest::Goodbye(goodbye_reason) => self.sync.on_goodbye(peer_id, goodbye_reason), RPCRequest::BeaconBlockRoots(request) => self.sync.on_beacon_block_roots_request( peer_id, request_id, request, &mut self.network_context, ), RPCRequest::BeaconBlockHeaders(request) => self.sync.on_beacon_block_headers_request( peer_id, request_id, request, &mut self.network_context, ), RPCRequest::BeaconBlockBodies(request) => self.sync.on_beacon_block_bodies_request( peer_id, request_id, request, &mut self.network_context, ), RPCRequest::BeaconChainState(_) => { // We do not implement this endpoint, it is not required and will only likely be // useful for light-client support in later phases. warn!(self.log, "BeaconChainState RPC call is not supported."); } } } /// An RPC response has been received from the network. // we match on id and ignore responses past the timeout. fn handle_rpc_response(&mut self, peer_id: PeerId, error_response: RPCErrorResponse) { // an error could have occurred. // TODO: Handle Error gracefully match error_response { RPCErrorResponse::InvalidRequest(error) => { warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Invalid Request" => error.as_string()) } RPCErrorResponse::ServerError(error) => { warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Server Error" => error.as_string()) } RPCErrorResponse::Unknown(error) => { warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Unknown Error" => error.as_string()) } RPCErrorResponse::Success(response) => { match response { RPCResponse::Hello(hello_message) => { self.sync.on_hello_response( peer_id, hello_message, &mut self.network_context, ); } RPCResponse::BeaconBlockRoots(response) => { self.sync.on_beacon_block_roots_response( peer_id, response, &mut self.network_context, ); } RPCResponse::BeaconBlockHeaders(response) => { match self.decode_block_headers(response) { Ok(decoded_block_headers) => { self.sync.on_beacon_block_headers_response( peer_id, decoded_block_headers, &mut self.network_context, ); } Err(_e) => { warn!(self.log, "Peer sent invalid block headers";"peer" => format!("{:?}", peer_id)) } } } RPCResponse::BeaconBlockBodies(response) => { match self.decode_block_bodies(response) { Ok(decoded_block_bodies) => { self.sync.on_beacon_block_bodies_response( peer_id, decoded_block_bodies, &mut self.network_context, ); } Err(_e) => { warn!(self.log, "Peer sent invalid block bodies";"peer" => format!("{:?}", peer_id)) } } } RPCResponse::BeaconChainState(_) => { // We do not implement this endpoint, it is not required and will only likely be // useful for light-client support in later phases. // // Theoretically, we shouldn't reach this code because we should never send a // beacon state RPC request. warn!(self.log, "BeaconChainState RPC call is not supported."); } } } } } /// Verifies and decodes the ssz-encoded block bodies received from peers. fn decode_block_bodies( &self, bodies_response: BeaconBlockBodiesResponse, ) -> Result, DecodeError> { //TODO: Implement faster block verification before decoding entirely let block_bodies = Vec::from_ssz_bytes(&bodies_response.block_bodies)?; Ok(DecodedBeaconBlockBodiesResponse { block_roots: bodies_response .block_roots .expect("Responses must have associated roots"), block_bodies, }) } /// Verifies and decodes the ssz-encoded block headers received from peers. fn decode_block_headers( &self, headers_response: BeaconBlockHeadersResponse, ) -> Result, DecodeError> { //TODO: Implement faster header verification before decoding entirely Vec::from_ssz_bytes(&headers_response.headers) } /// Handle various RPC errors fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { //TODO: Handle error correctly warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error)); } /// Handle RPC messages fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) { match gossip_message { PubsubMessage::Block(message) => { let _should_forward_on = self.sync .on_block_gossip(peer_id, message, &mut self.network_context); } PubsubMessage::Attestation(message) => { self.sync .on_attestation_gossip(peer_id, message, &mut self.network_context) } } } } // TODO: RPC Rewrite makes this struct fairly pointless pub struct NetworkContext { /// The network channel to relay messages to the Network service. network_send: mpsc::UnboundedSender>, /// The `MessageHandler` logger. log: slog::Logger, } impl NetworkContext { pub fn new(network_send: mpsc::UnboundedSender>, log: slog::Logger) -> Self { Self { network_send, log } } pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.send_rpc_request(peer_id, RPCRequest::Goodbye(reason)) // TODO: disconnect peers. } pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) { // Note: There is currently no use of keeping track of requests. However the functionality // is left here for future revisions. self.send_rpc_event(peer_id, RPCEvent::Request(0, rpc_request)); } //TODO: Handle Error responses pub fn send_rpc_response( &mut self, peer_id: PeerId, request_id: RequestId, rpc_response: RPCResponse, ) { self.send_rpc_event( peer_id, RPCEvent::Response(request_id, RPCErrorResponse::Success(rpc_response)), ); } fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { self.send(peer_id, OutgoingMessage::RPC(rpc_event)) } fn send(&mut self, peer_id: PeerId, outgoing_message: OutgoingMessage) { self.network_send .try_send(NetworkMessage::Send(peer_id, outgoing_message)) .unwrap_or_else(|_| { warn!( self.log, "Could not send RPC message to the network service" ) }); } }