Handle peer dials and propagate to message handler
This commit is contained in:
parent
9803ab30f2
commit
2e0c8e2e47
@ -1,4 +1,4 @@
|
||||
use crate::rpc::{Rpc, RpcEvent};
|
||||
use crate::rpc::{RPCEvent, RPCMessage, Rpc};
|
||||
use futures::prelude::*;
|
||||
use libp2p::{
|
||||
core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
|
||||
@ -38,19 +38,24 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubE
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RpcEvent>
|
||||
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RPCMessage>
|
||||
for Behaviour<TSubstream>
|
||||
{
|
||||
fn inject_event(&mut self, event: RpcEvent) {
|
||||
self.events.push(BehaviourEvent::RPC(event));
|
||||
fn inject_event(&mut self, event: RPCMessage) {
|
||||
match event {
|
||||
RPCMessage::PeerDialed(peer_id) => {
|
||||
self.events.push(BehaviourEvent::PeerDialed(peer_id))
|
||||
}
|
||||
RPCMessage::RPC(rpc_event) => self.events.push(BehaviourEvent::RPC(rpc_event)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
|
||||
pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig) -> Self {
|
||||
pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig, log: &slog::Logger) -> Self {
|
||||
Behaviour {
|
||||
gossipsub: Gossipsub::new(local_peer_id, gs_config),
|
||||
serenity_rpc: Rpc::new(),
|
||||
serenity_rpc: Rpc::new(log),
|
||||
events: Vec::new(),
|
||||
}
|
||||
}
|
||||
@ -80,7 +85,8 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
|
||||
|
||||
/// The types of events than can be obtained from polling the behaviour.
|
||||
pub enum BehaviourEvent {
|
||||
RPC(RpcEvent),
|
||||
RPC(RPCEvent),
|
||||
PeerDialed(PeerId),
|
||||
// TODO: This is a stub at the moment
|
||||
Message(String),
|
||||
}
|
||||
|
@ -13,8 +13,7 @@ pub use libp2p::{
|
||||
PeerId,
|
||||
};
|
||||
pub use network_config::NetworkConfig;
|
||||
pub use rpc::HelloMessage;
|
||||
pub use rpc::RpcEvent;
|
||||
pub use rpc::{HelloMessage, RPCEvent};
|
||||
pub use service::Libp2pEvent;
|
||||
pub use service::Service;
|
||||
pub use types::multiaddr;
|
||||
|
@ -12,7 +12,8 @@ use libp2p::core::swarm::{
|
||||
};
|
||||
use libp2p::{Multiaddr, PeerId};
|
||||
pub use methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse};
|
||||
pub use protocol::{RPCProtocol, RpcEvent};
|
||||
pub use protocol::{RPCEvent, RPCProtocol};
|
||||
use slog::{debug, o, Logger};
|
||||
use std::marker::PhantomData;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
@ -21,22 +22,26 @@ use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
pub struct Rpc<TSubstream> {
|
||||
/// Queue of events to processed.
|
||||
events: Vec<NetworkBehaviourAction<RpcEvent, RpcEvent>>,
|
||||
events: Vec<NetworkBehaviourAction<RPCEvent, RPCMessage>>,
|
||||
/// Pins the generic substream.
|
||||
marker: PhantomData<TSubstream>,
|
||||
/// Slog logger for RPC behaviour.
|
||||
log: slog::Logger,
|
||||
}
|
||||
|
||||
impl<TSubstream> Rpc<TSubstream> {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(log: &slog::Logger) -> Self {
|
||||
let log = log.new(o!("Service" => "Libp2p-RPC"));
|
||||
Rpc {
|
||||
events: Vec::new(),
|
||||
marker: PhantomData,
|
||||
log,
|
||||
}
|
||||
}
|
||||
|
||||
/// Submits and RPC request.
|
||||
pub fn send_request(&mut self, peer_id: PeerId, id: u64, method_id: u16, body: RPCRequest) {
|
||||
let request = RpcEvent::Request {
|
||||
let request = RPCEvent::Request {
|
||||
id,
|
||||
method_id,
|
||||
body,
|
||||
@ -52,8 +57,8 @@ impl<TSubstream> NetworkBehaviour for Rpc<TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type ProtocolsHandler = OneShotHandler<TSubstream, RPCProtocol, RpcEvent, OneShotEvent>;
|
||||
type OutEvent = RpcEvent;
|
||||
type ProtocolsHandler = OneShotHandler<TSubstream, RPCProtocol, RPCEvent, OneShotEvent>;
|
||||
type OutEvent = RPCMessage;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
Default::default()
|
||||
@ -63,7 +68,14 @@ where
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {}
|
||||
fn inject_connected(&mut self, peer_id: PeerId, connected_point: ConnectedPoint) {
|
||||
// if initialised the connection, report this upwards to send the HELLO request
|
||||
if let ConnectedPoint::Dialer { address } = connected_point {
|
||||
self.events.push(NetworkBehaviourAction::GenerateEvent(
|
||||
RPCMessage::PeerDialed(peer_id),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}
|
||||
|
||||
@ -80,7 +92,9 @@ where
|
||||
|
||||
// send the event to the user
|
||||
self.events
|
||||
.push(NetworkBehaviourAction::GenerateEvent(event));
|
||||
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage::RPC(
|
||||
event,
|
||||
)));
|
||||
}
|
||||
|
||||
fn poll(
|
||||
@ -99,18 +113,24 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Transmission between the `OneShotHandler` and the `RpcEvent`.
|
||||
/// Messages sent to the user from the RPC protocol.
|
||||
pub enum RPCMessage {
|
||||
RPC(RPCEvent),
|
||||
PeerDialed(PeerId),
|
||||
}
|
||||
|
||||
/// Transmission between the `OneShotHandler` and the `RPCEvent`.
|
||||
#[derive(Debug)]
|
||||
pub enum OneShotEvent {
|
||||
/// We received an RPC from a remote.
|
||||
Rx(RpcEvent),
|
||||
Rx(RPCEvent),
|
||||
/// We successfully sent an RPC request.
|
||||
Sent,
|
||||
}
|
||||
|
||||
impl From<RpcEvent> for OneShotEvent {
|
||||
impl From<RPCEvent> for OneShotEvent {
|
||||
#[inline]
|
||||
fn from(rpc: RpcEvent) -> OneShotEvent {
|
||||
fn from(rpc: RPCEvent) -> OneShotEvent {
|
||||
OneShotEvent::Rx(rpc)
|
||||
}
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ impl Default for RPCProtocol {
|
||||
|
||||
/// The RPC types which are sent/received in this protocol.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RpcEvent {
|
||||
pub enum RPCEvent {
|
||||
Request {
|
||||
id: u64,
|
||||
method_id: u16,
|
||||
@ -44,7 +44,7 @@ pub enum RpcEvent {
|
||||
},
|
||||
}
|
||||
|
||||
impl UpgradeInfo for RpcEvent {
|
||||
impl UpgradeInfo for RPCEvent {
|
||||
type Info = &'static [u8];
|
||||
type InfoIter = iter::Once<Self::Info>;
|
||||
|
||||
@ -58,17 +58,17 @@ impl<TSocket> InboundUpgrade<TSocket> for RPCProtocol
|
||||
where
|
||||
TSocket: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Output = RpcEvent;
|
||||
type Output = RPCEvent;
|
||||
type Error = DecodeError;
|
||||
type Future =
|
||||
upgrade::ReadOneThen<TSocket, (), fn(Vec<u8>, ()) -> Result<RpcEvent, DecodeError>>;
|
||||
upgrade::ReadOneThen<TSocket, (), fn(Vec<u8>, ()) -> Result<RPCEvent, DecodeError>>;
|
||||
|
||||
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
|
||||
upgrade::read_one_then(socket, MAX_READ_SIZE, (), |packet, ()| Ok(decode(packet)?))
|
||||
}
|
||||
}
|
||||
|
||||
fn decode(packet: Vec<u8>) -> Result<RpcEvent, DecodeError> {
|
||||
fn decode(packet: Vec<u8>) -> Result<RPCEvent, DecodeError> {
|
||||
// decode the header of the rpc
|
||||
// request/response
|
||||
let (request, index) = bool::ssz_decode(&packet, 0)?;
|
||||
@ -84,7 +84,7 @@ fn decode(packet: Vec<u8>) -> Result<RpcEvent, DecodeError> {
|
||||
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
|
||||
};
|
||||
|
||||
return Ok(RpcEvent::Request {
|
||||
return Ok(RPCEvent::Request {
|
||||
id,
|
||||
method_id,
|
||||
body,
|
||||
@ -99,7 +99,7 @@ fn decode(packet: Vec<u8>) -> Result<RpcEvent, DecodeError> {
|
||||
}
|
||||
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
|
||||
};
|
||||
return Ok(RpcEvent::Response {
|
||||
return Ok(RPCEvent::Response {
|
||||
id,
|
||||
method_id,
|
||||
result,
|
||||
@ -107,7 +107,7 @@ fn decode(packet: Vec<u8>) -> Result<RpcEvent, DecodeError> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSocket> OutboundUpgrade<TSocket> for RpcEvent
|
||||
impl<TSocket> OutboundUpgrade<TSocket> for RPCEvent
|
||||
where
|
||||
TSocket: AsyncWrite,
|
||||
{
|
||||
@ -122,10 +122,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl Encodable for RpcEvent {
|
||||
impl Encodable for RPCEvent {
|
||||
fn ssz_append(&self, s: &mut SszStream) {
|
||||
match self {
|
||||
RpcEvent::Request {
|
||||
RPCEvent::Request {
|
||||
id,
|
||||
method_id,
|
||||
body,
|
||||
@ -137,7 +137,7 @@ impl Encodable for RpcEvent {
|
||||
RPCRequest::Hello(body) => s.append(body),
|
||||
};
|
||||
}
|
||||
RpcEvent::Response {
|
||||
RPCEvent::Response {
|
||||
id,
|
||||
method_id,
|
||||
result,
|
||||
|
@ -1,7 +1,7 @@
|
||||
use crate::behaviour::{Behaviour, BehaviourEvent};
|
||||
use crate::error;
|
||||
use crate::multiaddr::Protocol;
|
||||
use crate::rpc::RpcEvent;
|
||||
use crate::rpc::RPCEvent;
|
||||
use crate::NetworkConfig;
|
||||
use futures::prelude::*;
|
||||
use futures::Stream;
|
||||
@ -41,7 +41,7 @@ impl Service {
|
||||
// Set up the transport
|
||||
let transport = build_transport(local_private_key);
|
||||
// Set up gossipsub routing
|
||||
let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config);
|
||||
let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config, &log);
|
||||
// Set up Topology
|
||||
let topology = local_peer_id.clone();
|
||||
Swarm::new(transport, behaviour, topology)
|
||||
@ -108,6 +108,9 @@ impl Stream for Service {
|
||||
Ok(Async::Ready(Some(BehaviourEvent::RPC(event)))) => {
|
||||
return Ok(Async::Ready(Some(Libp2pEvent::RPC(event))));
|
||||
}
|
||||
Ok(Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id)))) => {
|
||||
return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))));
|
||||
}
|
||||
Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"),
|
||||
Ok(Async::NotReady) => break,
|
||||
_ => break,
|
||||
@ -155,6 +158,7 @@ fn build_transport(
|
||||
/// Events that can be obtained from polling the Libp2p Service.
|
||||
pub enum Libp2pEvent {
|
||||
// We have received an RPC event on the swarm
|
||||
RPC(RpcEvent),
|
||||
RPC(RPCEvent),
|
||||
PeerDialed(PeerId),
|
||||
Message(String),
|
||||
}
|
||||
|
@ -4,30 +4,37 @@ use crossbeam_channel::{unbounded as channel, Sender, TryRecvError};
|
||||
use futures::future;
|
||||
use futures::prelude::*;
|
||||
use libp2p::rpc;
|
||||
use libp2p::{PeerId, RpcEvent};
|
||||
use libp2p::{PeerId, RPCEvent};
|
||||
use slog::debug;
|
||||
use std::collections::HashMap;
|
||||
use std::time::{Duration, Instant};
|
||||
use sync::SimpleSync;
|
||||
use types::Hash256;
|
||||
|
||||
/// Timeout for establishing a HELLO handshake.
|
||||
const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Handles messages received from the network and client and organises syncing.
|
||||
pub struct MessageHandler {
|
||||
sync: SimpleSync,
|
||||
//TODO: Implement beacon chain
|
||||
//chain: BeaconChain
|
||||
/// A mapping of peers we have sent a HELLO rpc request to
|
||||
hello_requests: HashMap<PeerId, Instant>,
|
||||
log: slog::Logger,
|
||||
}
|
||||
|
||||
/// Types of messages the handler can receive.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum HandlerMessage {
|
||||
/// Peer has connected.
|
||||
PeerConnected(PeerId),
|
||||
/// We have initiated a connection to a new peer.
|
||||
PeerDialed(PeerId),
|
||||
/// Peer has disconnected,
|
||||
PeerDisconnected(PeerId),
|
||||
/// A Node message has been received.
|
||||
Message(PeerId, NodeMessage),
|
||||
/// An RPC response/request has been received.
|
||||
RPC(RpcEvent),
|
||||
RPC(RPCEvent),
|
||||
}
|
||||
|
||||
impl MessageHandler {
|
||||
@ -49,6 +56,7 @@ impl MessageHandler {
|
||||
//TODO: Initialise beacon chain
|
||||
let mut handler = MessageHandler {
|
||||
sync,
|
||||
hello_requests: HashMap::new(),
|
||||
log: log.clone(),
|
||||
};
|
||||
|
||||
|
@ -1,11 +1,11 @@
|
||||
use libp2p::PeerId;
|
||||
use libp2p::{HelloMessage, RpcEvent};
|
||||
use libp2p::{HelloMessage, RPCEvent};
|
||||
use types::{Hash256, Slot};
|
||||
|
||||
/// Messages between nodes across the network.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum NodeMessage {
|
||||
RPC(RpcEvent),
|
||||
RPC(RPCEvent),
|
||||
BlockRequest,
|
||||
// TODO: only for testing - remove
|
||||
Message(String),
|
||||
|
@ -108,6 +108,12 @@ fn network_service(
|
||||
.send(HandlerMessage::RPC(rpc_event))
|
||||
.map_err(|_| "failed to send rpc to handler");
|
||||
}
|
||||
Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => {
|
||||
debug!(libp2p_service.log, "Peer Dialed: {:?}", peer_id);
|
||||
message_handler_send
|
||||
.send(HandlerMessage::PeerDialed(peer_id))
|
||||
.map_err(|_| "failed to send rpc to handler");
|
||||
}
|
||||
Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!(
|
||||
libp2p_service.log,
|
||||
"Network Service: Message received: {}", m
|
||||
|
Loading…
Reference in New Issue
Block a user