diff --git a/beacon_node/libp2p/src/behaviour.rs b/beacon_node/libp2p/src/behaviour.rs index 96355cf3f..604b84c8f 100644 --- a/beacon_node/libp2p/src/behaviour.rs +++ b/beacon_node/libp2p/src/behaviour.rs @@ -1,4 +1,4 @@ -use crate::rpc::{Rpc, RpcEvent}; +use crate::rpc::{RPCEvent, RPCMessage, Rpc}; use futures::prelude::*; use libp2p::{ core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, @@ -38,19 +38,24 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess +impl NetworkBehaviourEventProcess for Behaviour { - fn inject_event(&mut self, event: RpcEvent) { - self.events.push(BehaviourEvent::RPC(event)); + fn inject_event(&mut self, event: RPCMessage) { + match event { + RPCMessage::PeerDialed(peer_id) => { + self.events.push(BehaviourEvent::PeerDialed(peer_id)) + } + RPCMessage::RPC(rpc_event) => self.events.push(BehaviourEvent::RPC(rpc_event)), + } } } impl Behaviour { - pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig) -> Self { + pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig, log: &slog::Logger) -> Self { Behaviour { gossipsub: Gossipsub::new(local_peer_id, gs_config), - serenity_rpc: Rpc::new(), + serenity_rpc: Rpc::new(log), events: Vec::new(), } } @@ -80,7 +85,8 @@ impl Behaviour { /// The types of events than can be obtained from polling the behaviour. pub enum BehaviourEvent { - RPC(RpcEvent), + RPC(RPCEvent), + PeerDialed(PeerId), // TODO: This is a stub at the moment Message(String), } diff --git a/beacon_node/libp2p/src/lib.rs b/beacon_node/libp2p/src/lib.rs index 69f6eb650..f3e97355d 100644 --- a/beacon_node/libp2p/src/lib.rs +++ b/beacon_node/libp2p/src/lib.rs @@ -13,8 +13,7 @@ pub use libp2p::{ PeerId, }; pub use network_config::NetworkConfig; -pub use rpc::HelloMessage; -pub use rpc::RpcEvent; +pub use rpc::{HelloMessage, RPCEvent}; pub use service::Libp2pEvent; pub use service::Service; pub use types::multiaddr; diff --git a/beacon_node/libp2p/src/rpc/mod.rs b/beacon_node/libp2p/src/rpc/mod.rs index 3420217ce..d40e53935 100644 --- a/beacon_node/libp2p/src/rpc/mod.rs +++ b/beacon_node/libp2p/src/rpc/mod.rs @@ -12,7 +12,8 @@ use libp2p::core::swarm::{ }; use libp2p::{Multiaddr, PeerId}; pub use methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; -pub use protocol::{RPCProtocol, RpcEvent}; +pub use protocol::{RPCEvent, RPCProtocol}; +use slog::{debug, o, Logger}; use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncWrite}; @@ -21,22 +22,26 @@ use tokio::io::{AsyncRead, AsyncWrite}; pub struct Rpc { /// Queue of events to processed. - events: Vec>, + events: Vec>, /// Pins the generic substream. marker: PhantomData, + /// Slog logger for RPC behaviour. + log: slog::Logger, } impl Rpc { - pub fn new() -> Self { + pub fn new(log: &slog::Logger) -> Self { + let log = log.new(o!("Service" => "Libp2p-RPC")); Rpc { events: Vec::new(), marker: PhantomData, + log, } } /// Submits and RPC request. pub fn send_request(&mut self, peer_id: PeerId, id: u64, method_id: u16, body: RPCRequest) { - let request = RpcEvent::Request { + let request = RPCEvent::Request { id, method_id, body, @@ -52,8 +57,8 @@ impl NetworkBehaviour for Rpc where TSubstream: AsyncRead + AsyncWrite, { - type ProtocolsHandler = OneShotHandler; - type OutEvent = RpcEvent; + type ProtocolsHandler = OneShotHandler; + type OutEvent = RPCMessage; fn new_handler(&mut self) -> Self::ProtocolsHandler { Default::default() @@ -63,7 +68,14 @@ where Vec::new() } - fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {} + fn inject_connected(&mut self, peer_id: PeerId, connected_point: ConnectedPoint) { + // if initialised the connection, report this upwards to send the HELLO request + if let ConnectedPoint::Dialer { address } = connected_point { + self.events.push(NetworkBehaviourAction::GenerateEvent( + RPCMessage::PeerDialed(peer_id), + )); + } + } fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} @@ -80,7 +92,9 @@ where // send the event to the user self.events - .push(NetworkBehaviourAction::GenerateEvent(event)); + .push(NetworkBehaviourAction::GenerateEvent(RPCMessage::RPC( + event, + ))); } fn poll( @@ -99,18 +113,24 @@ where } } -/// Transmission between the `OneShotHandler` and the `RpcEvent`. +/// Messages sent to the user from the RPC protocol. +pub enum RPCMessage { + RPC(RPCEvent), + PeerDialed(PeerId), +} + +/// Transmission between the `OneShotHandler` and the `RPCEvent`. #[derive(Debug)] pub enum OneShotEvent { /// We received an RPC from a remote. - Rx(RpcEvent), + Rx(RPCEvent), /// We successfully sent an RPC request. Sent, } -impl From for OneShotEvent { +impl From for OneShotEvent { #[inline] - fn from(rpc: RpcEvent) -> OneShotEvent { + fn from(rpc: RPCEvent) -> OneShotEvent { OneShotEvent::Rx(rpc) } } diff --git a/beacon_node/libp2p/src/rpc/protocol.rs b/beacon_node/libp2p/src/rpc/protocol.rs index 74b8322eb..dce714429 100644 --- a/beacon_node/libp2p/src/rpc/protocol.rs +++ b/beacon_node/libp2p/src/rpc/protocol.rs @@ -31,7 +31,7 @@ impl Default for RPCProtocol { /// The RPC types which are sent/received in this protocol. #[derive(Debug, Clone)] -pub enum RpcEvent { +pub enum RPCEvent { Request { id: u64, method_id: u16, @@ -44,7 +44,7 @@ pub enum RpcEvent { }, } -impl UpgradeInfo for RpcEvent { +impl UpgradeInfo for RPCEvent { type Info = &'static [u8]; type InfoIter = iter::Once; @@ -58,17 +58,17 @@ impl InboundUpgrade for RPCProtocol where TSocket: AsyncRead + AsyncWrite, { - type Output = RpcEvent; + type Output = RPCEvent; type Error = DecodeError; type Future = - upgrade::ReadOneThen, ()) -> Result>; + upgrade::ReadOneThen, ()) -> Result>; fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { upgrade::read_one_then(socket, MAX_READ_SIZE, (), |packet, ()| Ok(decode(packet)?)) } } -fn decode(packet: Vec) -> Result { +fn decode(packet: Vec) -> Result { // decode the header of the rpc // request/response let (request, index) = bool::ssz_decode(&packet, 0)?; @@ -84,7 +84,7 @@ fn decode(packet: Vec) -> Result { RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), }; - return Ok(RpcEvent::Request { + return Ok(RPCEvent::Request { id, method_id, body, @@ -99,7 +99,7 @@ fn decode(packet: Vec) -> Result { } RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), }; - return Ok(RpcEvent::Response { + return Ok(RPCEvent::Response { id, method_id, result, @@ -107,7 +107,7 @@ fn decode(packet: Vec) -> Result { } } -impl OutboundUpgrade for RpcEvent +impl OutboundUpgrade for RPCEvent where TSocket: AsyncWrite, { @@ -122,10 +122,10 @@ where } } -impl Encodable for RpcEvent { +impl Encodable for RPCEvent { fn ssz_append(&self, s: &mut SszStream) { match self { - RpcEvent::Request { + RPCEvent::Request { id, method_id, body, @@ -137,7 +137,7 @@ impl Encodable for RpcEvent { RPCRequest::Hello(body) => s.append(body), }; } - RpcEvent::Response { + RPCEvent::Response { id, method_id, result, diff --git a/beacon_node/libp2p/src/service.rs b/beacon_node/libp2p/src/service.rs index a672e153b..dd6deabad 100644 --- a/beacon_node/libp2p/src/service.rs +++ b/beacon_node/libp2p/src/service.rs @@ -1,7 +1,7 @@ use crate::behaviour::{Behaviour, BehaviourEvent}; use crate::error; use crate::multiaddr::Protocol; -use crate::rpc::RpcEvent; +use crate::rpc::RPCEvent; use crate::NetworkConfig; use futures::prelude::*; use futures::Stream; @@ -41,7 +41,7 @@ impl Service { // Set up the transport let transport = build_transport(local_private_key); // Set up gossipsub routing - let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config); + let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config, &log); // Set up Topology let topology = local_peer_id.clone(); Swarm::new(transport, behaviour, topology) @@ -108,6 +108,9 @@ impl Stream for Service { Ok(Async::Ready(Some(BehaviourEvent::RPC(event)))) => { return Ok(Async::Ready(Some(Libp2pEvent::RPC(event)))); } + Ok(Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id)))) => { + return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))); + } Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), Ok(Async::NotReady) => break, _ => break, @@ -155,6 +158,7 @@ fn build_transport( /// Events that can be obtained from polling the Libp2p Service. pub enum Libp2pEvent { // We have received an RPC event on the swarm - RPC(RpcEvent), + RPC(RPCEvent), + PeerDialed(PeerId), Message(String), } diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index fe9780ad5..c059795ed 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -4,30 +4,37 @@ use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; use futures::future; use futures::prelude::*; use libp2p::rpc; -use libp2p::{PeerId, RpcEvent}; +use libp2p::{PeerId, RPCEvent}; use slog::debug; +use std::collections::HashMap; +use std::time::{Duration, Instant}; use sync::SimpleSync; use types::Hash256; +/// Timeout for establishing a HELLO handshake. +const HELLO_TIMEOUT: Duration = Duration::from_secs(30); + /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { sync: SimpleSync, //TODO: Implement beacon chain //chain: BeaconChain + /// A mapping of peers we have sent a HELLO rpc request to + hello_requests: HashMap, log: slog::Logger, } /// Types of messages the handler can receive. #[derive(Debug, Clone)] pub enum HandlerMessage { - /// Peer has connected. - PeerConnected(PeerId), + /// We have initiated a connection to a new peer. + PeerDialed(PeerId), /// Peer has disconnected, PeerDisconnected(PeerId), /// A Node message has been received. Message(PeerId, NodeMessage), /// An RPC response/request has been received. - RPC(RpcEvent), + RPC(RPCEvent), } impl MessageHandler { @@ -49,6 +56,7 @@ impl MessageHandler { //TODO: Initialise beacon chain let mut handler = MessageHandler { sync, + hello_requests: HashMap::new(), log: log.clone(), }; diff --git a/beacon_node/network/src/messages.rs b/beacon_node/network/src/messages.rs index 064424a87..930c90b3e 100644 --- a/beacon_node/network/src/messages.rs +++ b/beacon_node/network/src/messages.rs @@ -1,11 +1,11 @@ use libp2p::PeerId; -use libp2p::{HelloMessage, RpcEvent}; +use libp2p::{HelloMessage, RPCEvent}; use types::{Hash256, Slot}; /// Messages between nodes across the network. #[derive(Debug, Clone)] pub enum NodeMessage { - RPC(RpcEvent), + RPC(RPCEvent), BlockRequest, // TODO: only for testing - remove Message(String), diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index bd01027e9..fc91cf53a 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -108,6 +108,12 @@ fn network_service( .send(HandlerMessage::RPC(rpc_event)) .map_err(|_| "failed to send rpc to handler"); } + Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => { + debug!(libp2p_service.log, "Peer Dialed: {:?}", peer_id); + message_handler_send + .send(HandlerMessage::PeerDialed(peer_id)) + .map_err(|_| "failed to send rpc to handler"); + } Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!( libp2p_service.log, "Network Service: Message received: {}", m