commit
d936bc0e5d
@ -1,11 +1,18 @@
|
||||
use crate::rpc::{RPCEvent, RPCMessage, Rpc};
|
||||
use crate::NetworkConfig;
|
||||
use futures::prelude::*;
|
||||
use libp2p::{
|
||||
core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
|
||||
gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent},
|
||||
core::{
|
||||
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
|
||||
PublicKey,
|
||||
},
|
||||
gossipsub::{Gossipsub, GossipsubEvent},
|
||||
identify::{protocol::IdentifyInfo, Identify, IdentifyEvent},
|
||||
ping::{Ping, PingEvent},
|
||||
tokio_io::{AsyncRead, AsyncWrite},
|
||||
NetworkBehaviour, PeerId,
|
||||
};
|
||||
use slog::{debug, o};
|
||||
use types::Topic;
|
||||
|
||||
/// Builds the network behaviour for the libp2p Swarm.
|
||||
@ -13,12 +20,22 @@ use types::Topic;
|
||||
#[derive(NetworkBehaviour)]
|
||||
#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")]
|
||||
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
|
||||
/// The routing pub-sub mechanism for eth2.
|
||||
gossipsub: Gossipsub<TSubstream>,
|
||||
// TODO: Add Kademlia for peer discovery
|
||||
/// The events generated by this behaviour to be consumed in the swarm poll.
|
||||
serenity_rpc: Rpc<TSubstream>,
|
||||
/// Allows discovery of IP addresses for peers on the network.
|
||||
identify: Identify<TSubstream>,
|
||||
/// Keep regular connection to peers and disconnect if absent.
|
||||
// TODO: Keepalive, likely remove this later.
|
||||
// TODO: Make the ping time customizeable.
|
||||
ping: Ping<TSubstream>,
|
||||
#[behaviour(ignore)]
|
||||
events: Vec<BehaviourEvent>,
|
||||
/// Logger for behaviour actions.
|
||||
#[behaviour(ignore)]
|
||||
log: slog::Logger,
|
||||
}
|
||||
|
||||
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
|
||||
@ -53,12 +70,54 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RPCMessage
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<IdentifyEvent>
|
||||
for Behaviour<TSubstream>
|
||||
{
|
||||
fn inject_event(&mut self, event: IdentifyEvent) {
|
||||
match event {
|
||||
IdentifyEvent::Identified {
|
||||
peer_id, mut info, ..
|
||||
} => {
|
||||
if info.listen_addrs.len() > 20 {
|
||||
debug!(
|
||||
self.log,
|
||||
"More than 20 peers have been identified, truncating"
|
||||
);
|
||||
info.listen_addrs.truncate(20);
|
||||
}
|
||||
self.events.push(BehaviourEvent::Identified(peer_id, info));
|
||||
}
|
||||
IdentifyEvent::Error { .. } => {}
|
||||
IdentifyEvent::SendBack { .. } => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<PingEvent>
|
||||
for Behaviour<TSubstream>
|
||||
{
|
||||
fn inject_event(&mut self, _event: PingEvent) {
|
||||
// not interested in ping responses at the moment.
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
|
||||
pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig, log: &slog::Logger) -> Self {
|
||||
pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self {
|
||||
let local_peer_id = local_public_key.clone().into_peer_id();
|
||||
let identify_config = net_conf.identify_config.clone();
|
||||
let behaviour_log = log.new(o!());
|
||||
|
||||
Behaviour {
|
||||
gossipsub: Gossipsub::new(local_peer_id, gs_config),
|
||||
gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()),
|
||||
serenity_rpc: Rpc::new(log),
|
||||
identify: Identify::new(
|
||||
identify_config.version,
|
||||
identify_config.user_agent,
|
||||
local_public_key,
|
||||
),
|
||||
ping: Ping::new(),
|
||||
events: Vec::new(),
|
||||
log: behaviour_log,
|
||||
}
|
||||
}
|
||||
|
||||
@ -91,6 +150,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
|
||||
pub enum BehaviourEvent {
|
||||
RPC(PeerId, RPCEvent),
|
||||
PeerDialed(PeerId),
|
||||
Identified(PeerId, IdentifyInfo),
|
||||
// TODO: This is a stub at the moment
|
||||
Message(String),
|
||||
}
|
||||
|
@ -1,11 +1,9 @@
|
||||
use crate::Multiaddr;
|
||||
use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder};
|
||||
use libp2p::secio;
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
/// Network configuration for lighthouse.
|
||||
pub struct NetworkConfig {
|
||||
pub struct Config {
|
||||
//TODO: stubbing networking initial params, change in the future
|
||||
/// IP address to listen on.
|
||||
pub listen_addresses: Vec<Multiaddr>,
|
||||
@ -13,47 +11,56 @@ pub struct NetworkConfig {
|
||||
pub listen_port: u16,
|
||||
/// Gossipsub configuration parameters.
|
||||
pub gs_config: GossipsubConfig,
|
||||
/// Configuration parameters for node identification protocol.
|
||||
pub identify_config: IdentifyConfig,
|
||||
/// List of nodes to initially connect to.
|
||||
pub boot_nodes: Vec<Multiaddr>,
|
||||
/// Peer key related to this nodes PeerId.
|
||||
pub local_private_key: secio::SecioKeyPair,
|
||||
/// Client version
|
||||
pub client_version: String,
|
||||
/// List of topics to subscribe to as strings
|
||||
pub topics: Vec<String>,
|
||||
}
|
||||
|
||||
impl Default for NetworkConfig {
|
||||
impl Default for Config {
|
||||
/// Generate a default network configuration.
|
||||
fn default() -> Self {
|
||||
// TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this
|
||||
// PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733
|
||||
|
||||
NetworkConfig {
|
||||
Config {
|
||||
listen_addresses: vec!["/ip4/127.0.0.1/tcp/9000"
|
||||
.parse()
|
||||
.expect("is a correct multi-address")],
|
||||
listen_port: 9000,
|
||||
gs_config: GossipsubConfigBuilder::new().build(),
|
||||
identify_config: IdentifyConfig::default(),
|
||||
boot_nodes: Vec::new(),
|
||||
local_private_key: secio::SecioKeyPair::secp256k1_generated().unwrap(),
|
||||
client_version: version::version(),
|
||||
topics: vec![String::from("beacon_chain")],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NetworkConfig {
|
||||
impl Config {
|
||||
pub fn new(boot_nodes: Vec<Multiaddr>) -> Self {
|
||||
let mut conf = NetworkConfig::default();
|
||||
let mut conf = Config::default();
|
||||
conf.boot_nodes = boot_nodes;
|
||||
|
||||
conf
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for NetworkConfig {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "NetworkConfig: listen_addresses: {:?}, listen_port: {:?}, gs_config: {:?}, boot_nodes: {:?}, local_private_key: <Secio-PubKey {:?}>, client_version: {:?}", self.listen_addresses, self.listen_port, self.gs_config, self.boot_nodes, self.local_private_key.to_public_key(), self.client_version)
|
||||
/// The configuration parameters for the Identify protocol
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IdentifyConfig {
|
||||
/// The protocol version to listen on.
|
||||
pub version: String,
|
||||
/// The client's name and version for identification.
|
||||
pub user_agent: String,
|
||||
}
|
||||
|
||||
impl Default for IdentifyConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
version: "/eth/serenity/1.0".to_string(),
|
||||
user_agent: version::version(),
|
||||
}
|
||||
}
|
||||
}
|
@ -3,16 +3,16 @@
|
||||
///
|
||||
/// This crate builds and manages the libp2p services required by the beacon node.
|
||||
pub mod behaviour;
|
||||
mod config;
|
||||
pub mod error;
|
||||
mod network_config;
|
||||
pub mod rpc;
|
||||
mod service;
|
||||
|
||||
pub use config::Config as NetworkConfig;
|
||||
pub use libp2p::{
|
||||
gossipsub::{GossipsubConfig, GossipsubConfigBuilder},
|
||||
PeerId,
|
||||
};
|
||||
pub use network_config::NetworkConfig;
|
||||
pub use rpc::{HelloMessage, RPCEvent};
|
||||
pub use service::Libp2pEvent;
|
||||
pub use service::Service;
|
||||
|
@ -1,4 +1,4 @@
|
||||
use super::methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse};
|
||||
use super::methods::*;
|
||||
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
|
||||
use ssz::{ssz_encode, Decodable, Encodable, SszStream};
|
||||
use std::io;
|
||||
@ -6,7 +6,7 @@ use std::iter;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
/// The maximum bytes that can be sent across the RPC.
|
||||
const MAX_READ_SIZE: usize = 2048;
|
||||
const MAX_READ_SIZE: usize = 4_194_304; // 4M
|
||||
|
||||
/// Implementation of the `ConnectionUpgrade` for the rpc protocol.
|
||||
|
||||
@ -81,7 +81,31 @@ fn decode(packet: Vec<u8>) -> Result<RPCEvent, DecodeError> {
|
||||
let (hello_body, _index) = HelloMessage::ssz_decode(&packet, index)?;
|
||||
RPCRequest::Hello(hello_body)
|
||||
}
|
||||
RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod),
|
||||
RPCMethod::Goodbye => {
|
||||
let (goodbye_code, _index) = u64::ssz_decode(&packet, index)?;
|
||||
RPCRequest::Goodbye(goodbye_code)
|
||||
}
|
||||
RPCMethod::BeaconBlockRoots => {
|
||||
let (block_roots_request, _index) =
|
||||
BeaconBlockRootsRequest::ssz_decode(&packet, index)?;
|
||||
RPCRequest::BeaconBlockRoots(block_roots_request)
|
||||
}
|
||||
RPCMethod::BeaconBlockHeaders => {
|
||||
let (block_headers_request, _index) =
|
||||
BeaconBlockHeadersRequest::ssz_decode(&packet, index)?;
|
||||
RPCRequest::BeaconBlockHeaders(block_headers_request)
|
||||
}
|
||||
RPCMethod::BeaconBlockBodies => {
|
||||
let (block_bodies_request, _index) =
|
||||
BeaconBlockBodiesRequest::ssz_decode(&packet, index)?;
|
||||
RPCRequest::BeaconBlockBodies(block_bodies_request)
|
||||
}
|
||||
RPCMethod::BeaconChainState => {
|
||||
let (chain_state_request, _index) =
|
||||
BeaconChainStateRequest::ssz_decode(&packet, index)?;
|
||||
RPCRequest::BeaconChainState(chain_state_request)
|
||||
}
|
||||
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
|
||||
};
|
||||
|
||||
Ok(RPCEvent::Request {
|
||||
@ -97,7 +121,24 @@ fn decode(packet: Vec<u8>) -> Result<RPCEvent, DecodeError> {
|
||||
let (body, _index) = HelloMessage::ssz_decode(&packet, index)?;
|
||||
RPCResponse::Hello(body)
|
||||
}
|
||||
RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod),
|
||||
RPCMethod::Goodbye => unreachable!("Should never receive a goodbye response"),
|
||||
RPCMethod::BeaconBlockRoots => {
|
||||
let (body, _index) = BeaconBlockRootsResponse::ssz_decode(&packet, index)?;
|
||||
RPCResponse::BeaconBlockRoots(body)
|
||||
}
|
||||
RPCMethod::BeaconBlockHeaders => {
|
||||
let (body, _index) = BeaconBlockHeadersResponse::ssz_decode(&packet, index)?;
|
||||
RPCResponse::BeaconBlockHeaders(body)
|
||||
}
|
||||
RPCMethod::BeaconBlockBodies => {
|
||||
let (body, _index) = BeaconBlockBodiesResponse::ssz_decode(&packet, index)?;
|
||||
RPCResponse::BeaconBlockBodies(body)
|
||||
}
|
||||
RPCMethod::BeaconChainState => {
|
||||
let (body, _index) = BeaconChainStateResponse::ssz_decode(&packet, index)?;
|
||||
RPCResponse::BeaconChainState(body)
|
||||
}
|
||||
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
|
||||
};
|
||||
Ok(RPCEvent::Response {
|
||||
id,
|
||||
@ -137,7 +178,21 @@ impl Encodable for RPCEvent {
|
||||
RPCRequest::Hello(body) => {
|
||||
s.append(body);
|
||||
}
|
||||
_ => {}
|
||||
RPCRequest::Goodbye(body) => {
|
||||
s.append(body);
|
||||
}
|
||||
RPCRequest::BeaconBlockRoots(body) => {
|
||||
s.append(body);
|
||||
}
|
||||
RPCRequest::BeaconBlockHeaders(body) => {
|
||||
s.append(body);
|
||||
}
|
||||
RPCRequest::BeaconBlockBodies(body) => {
|
||||
s.append(body);
|
||||
}
|
||||
RPCRequest::BeaconChainState(body) => {
|
||||
s.append(body);
|
||||
}
|
||||
}
|
||||
}
|
||||
RPCEvent::Response {
|
||||
@ -152,7 +207,18 @@ impl Encodable for RPCEvent {
|
||||
RPCResponse::Hello(response) => {
|
||||
s.append(response);
|
||||
}
|
||||
_ => {}
|
||||
RPCResponse::BeaconBlockRoots(response) => {
|
||||
s.append(response);
|
||||
}
|
||||
RPCResponse::BeaconBlockHeaders(response) => {
|
||||
s.append(response);
|
||||
}
|
||||
RPCResponse::BeaconBlockBodies(response) => {
|
||||
s.append(response);
|
||||
}
|
||||
RPCResponse::BeaconChainState(response) => {
|
||||
s.append(response);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -11,8 +11,8 @@ use libp2p::core::{
|
||||
transport::boxed::Boxed,
|
||||
upgrade::{InboundUpgradeExt, OutboundUpgradeExt},
|
||||
};
|
||||
use libp2p::{core, secio, Transport};
|
||||
use libp2p::{PeerId, Swarm};
|
||||
use libp2p::identify::protocol::IdentifyInfo;
|
||||
use libp2p::{core, secio, PeerId, Swarm, Transport};
|
||||
use slog::{debug, info, trace, warn};
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::time::Duration;
|
||||
@ -33,7 +33,12 @@ impl Service {
|
||||
pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result<Self> {
|
||||
debug!(log, "Libp2p Service starting");
|
||||
|
||||
let local_private_key = config.local_private_key;
|
||||
// TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this
|
||||
// PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733
|
||||
// TODO: Save and recover node key from disk
|
||||
let local_private_key = secio::SecioKeyPair::secp256k1_generated().unwrap();
|
||||
|
||||
let local_public_key = local_private_key.to_public_key();
|
||||
let local_peer_id = local_private_key.to_peer_id();
|
||||
info!(log, "Local peer id: {:?}", local_peer_id);
|
||||
|
||||
@ -41,7 +46,7 @@ impl Service {
|
||||
// Set up the transport
|
||||
let transport = build_transport(local_private_key);
|
||||
// Set up gossipsub routing
|
||||
let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config, &log);
|
||||
let behaviour = Behaviour::new(local_public_key.clone(), &config, &log);
|
||||
// Set up Topology
|
||||
let topology = local_peer_id.clone();
|
||||
Swarm::new(transport, behaviour, topology)
|
||||
@ -99,17 +104,23 @@ impl Stream for Service {
|
||||
// TODO: Currently only gossipsub events passed here.
|
||||
// Build a type for more generic events
|
||||
match self.swarm.poll() {
|
||||
Ok(Async::Ready(Some(BehaviourEvent::Message(m)))) => {
|
||||
//Behaviour events
|
||||
Ok(Async::Ready(Some(event))) => match event {
|
||||
// TODO: Stub here for debugging
|
||||
debug!(self.log, "Message received: {}", m);
|
||||
return Ok(Async::Ready(Some(Libp2pEvent::Message(m))));
|
||||
}
|
||||
Ok(Async::Ready(Some(BehaviourEvent::RPC(peer_id, event)))) => {
|
||||
return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))));
|
||||
}
|
||||
Ok(Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id)))) => {
|
||||
return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))));
|
||||
}
|
||||
BehaviourEvent::Message(m) => {
|
||||
debug!(self.log, "Message received: {}", m);
|
||||
return Ok(Async::Ready(Some(Libp2pEvent::Message(m))));
|
||||
}
|
||||
BehaviourEvent::RPC(peer_id, event) => {
|
||||
return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))));
|
||||
}
|
||||
BehaviourEvent::PeerDialed(peer_id) => {
|
||||
return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))));
|
||||
}
|
||||
BehaviourEvent::Identified(peer_id, info) => {
|
||||
return Ok(Async::Ready(Some(Libp2pEvent::Identified(peer_id, info))));
|
||||
}
|
||||
},
|
||||
Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"),
|
||||
Ok(Async::NotReady) => break,
|
||||
_ => break,
|
||||
@ -156,8 +167,12 @@ fn build_transport(
|
||||
|
||||
/// Events that can be obtained from polling the Libp2p Service.
|
||||
pub enum Libp2pEvent {
|
||||
// We have received an RPC event on the swarm
|
||||
/// An RPC response request has been received on the swarm.
|
||||
RPC(PeerId, RPCEvent),
|
||||
/// Initiated the connection to a new peer.
|
||||
PeerDialed(PeerId),
|
||||
/// Received information about a peer on the network.
|
||||
Identified(PeerId, IdentifyInfo),
|
||||
// TODO: Pub-sub testing only.
|
||||
Message(String),
|
||||
}
|
||||
|
@ -15,8 +15,8 @@ use tokio::runtime::TaskExecutor;
|
||||
|
||||
/// Service that handles communication between internal services and the eth2_libp2p network service.
|
||||
pub struct Service {
|
||||
//eth2_libp2p_service: Arc<Mutex<LibP2PService>>,
|
||||
eth2_libp2p_exit: oneshot::Sender<()>,
|
||||
//libp2p_service: Arc<Mutex<LibP2PService>>,
|
||||
libp2p_exit: oneshot::Sender<()>,
|
||||
network_send: crossbeam_channel::Sender<NetworkMessage>,
|
||||
//message_handler: MessageHandler,
|
||||
//message_handler_send: Sender<HandlerMessage>,
|
||||
@ -40,20 +40,20 @@ impl Service {
|
||||
message_handler_log,
|
||||
)?;
|
||||
|
||||
// launch eth2_libp2p service
|
||||
let eth2_libp2p_log = log.new(o!("Service" => "Libp2p"));
|
||||
let eth2_libp2p_service = LibP2PService::new(config.clone(), eth2_libp2p_log)?;
|
||||
// launch libp2p service
|
||||
let libp2p_log = log.new(o!("Service" => "Libp2p"));
|
||||
let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?;
|
||||
|
||||
// TODO: Spawn thread to handle eth2_libp2p messages and pass to message handler thread.
|
||||
let eth2_libp2p_exit = spawn_service(
|
||||
eth2_libp2p_service,
|
||||
// TODO: Spawn thread to handle libp2p messages and pass to message handler thread.
|
||||
let libp2p_exit = spawn_service(
|
||||
libp2p_service,
|
||||
network_recv,
|
||||
message_handler_send,
|
||||
executor,
|
||||
log,
|
||||
)?;
|
||||
let network_service = Service {
|
||||
eth2_libp2p_exit,
|
||||
libp2p_exit,
|
||||
network_send: network_send.clone(),
|
||||
};
|
||||
|
||||
@ -72,7 +72,7 @@ impl Service {
|
||||
}
|
||||
|
||||
fn spawn_service(
|
||||
eth2_libp2p_service: LibP2PService,
|
||||
libp2p_service: LibP2PService,
|
||||
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
|
||||
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
|
||||
executor: &TaskExecutor,
|
||||
@ -83,7 +83,7 @@ fn spawn_service(
|
||||
// spawn on the current executor
|
||||
executor.spawn(
|
||||
network_service(
|
||||
eth2_libp2p_service,
|
||||
libp2p_service,
|
||||
network_recv,
|
||||
message_handler_send,
|
||||
log.clone(),
|
||||
@ -100,7 +100,7 @@ fn spawn_service(
|
||||
}
|
||||
|
||||
fn network_service(
|
||||
mut eth2_libp2p_service: LibP2PService,
|
||||
mut libp2p_service: LibP2PService,
|
||||
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
|
||||
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
|
||||
log: slog::Logger,
|
||||
@ -108,28 +108,34 @@ fn network_service(
|
||||
futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> {
|
||||
// poll the swarm
|
||||
loop {
|
||||
match eth2_libp2p_service.poll() {
|
||||
Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, rpc_event)))) => {
|
||||
trace!(
|
||||
eth2_libp2p_service.log,
|
||||
"RPC Event: RPC message received: {:?}",
|
||||
rpc_event
|
||||
);
|
||||
message_handler_send
|
||||
.send(HandlerMessage::RPC(peer_id, rpc_event))
|
||||
.map_err(|_| "failed to send rpc to handler")?;
|
||||
}
|
||||
Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => {
|
||||
debug!(eth2_libp2p_service.log, "Peer Dialed: {:?}", peer_id);
|
||||
message_handler_send
|
||||
.send(HandlerMessage::PeerDialed(peer_id))
|
||||
.map_err(|_| "failed to send rpc to handler")?;
|
||||
}
|
||||
Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!(
|
||||
eth2_libp2p_service.log,
|
||||
"Network Service: Message received: {}", m
|
||||
),
|
||||
_ => break,
|
||||
match libp2p_service.poll() {
|
||||
Ok(Async::Ready(Some(event))) => match event {
|
||||
Libp2pEvent::RPC(peer_id, rpc_event) => {
|
||||
trace!(log, "RPC Event: RPC message received: {:?}", rpc_event);
|
||||
message_handler_send
|
||||
.send(HandlerMessage::RPC(peer_id, rpc_event))
|
||||
.map_err(|_| "failed to send rpc to handler")?;
|
||||
}
|
||||
Libp2pEvent::PeerDialed(peer_id) => {
|
||||
debug!(log, "Peer Dialed: {:?}", peer_id);
|
||||
message_handler_send
|
||||
.send(HandlerMessage::PeerDialed(peer_id))
|
||||
.map_err(|_| "failed to send rpc to handler")?;
|
||||
}
|
||||
Libp2pEvent::Identified(peer_id, info) => {
|
||||
debug!(
|
||||
log,
|
||||
"We have identified peer: {:?} with {:?}", peer_id, info
|
||||
);
|
||||
}
|
||||
Libp2pEvent::Message(m) => debug!(
|
||||
libp2p_service.log,
|
||||
"Network Service: Message received: {}", m
|
||||
),
|
||||
},
|
||||
Ok(Async::Ready(None)) => unreachable!("Stream never ends"),
|
||||
Ok(Async::NotReady) => break,
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
// poll the network channel
|
||||
@ -143,7 +149,7 @@ fn network_service(
|
||||
trace!(log, "Sending RPC Event: {:?}", rpc_event);
|
||||
//TODO: Make swarm private
|
||||
//TODO: Implement correct peer id topic message handling
|
||||
eth2_libp2p_service.swarm.send_rpc(peer_id, rpc_event);
|
||||
libp2p_service.swarm.send_rpc(peer_id, rpc_event);
|
||||
}
|
||||
OutgoingMessage::NotifierTest => {
|
||||
debug!(log, "Received message from notifier");
|
||||
@ -165,7 +171,7 @@ fn network_service(
|
||||
/// Types of messages that the network service can receive.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum NetworkMessage {
|
||||
/// Send a message to eth2_libp2p service.
|
||||
/// Send a message to libp2p service.
|
||||
//TODO: Define typing for messages across the wire
|
||||
Send(PeerId, OutgoingMessage),
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user