Initial Libp2p RPC implementation.
This commit is contained in:
parent
23a8fbfc74
commit
c06e8ffa5b
@ -7,7 +7,6 @@ edition = "2018"
|
||||
[dependencies]
|
||||
client = { path = "client" }
|
||||
version = { path = "version" }
|
||||
|
||||
clap = "2.32.0"
|
||||
slog = "^2.2.3"
|
||||
slog-term = "^2.4.0"
|
||||
|
@ -8,6 +8,8 @@ edition = "2018"
|
||||
# SigP repository until PR is merged
|
||||
libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" }
|
||||
types = { path = "../../eth2/types" }
|
||||
ssz = { path = "../../eth2/utils/ssz" }
|
||||
ssz_derive = { path = "../../eth2/utils/ssz_derive" }
|
||||
slog = "2.4.1"
|
||||
version = { path = "../version" }
|
||||
tokio = "0.1.16"
|
||||
|
@ -5,6 +5,7 @@
|
||||
pub mod behaviour;
|
||||
pub mod error;
|
||||
mod network_config;
|
||||
mod rpc;
|
||||
mod service;
|
||||
|
||||
pub use libp2p::{
|
||||
|
0
beacon_node/libp2p/src/rpc/handler.rs
Normal file
0
beacon_node/libp2p/src/rpc/handler.rs
Normal file
38
beacon_node/libp2p/src/rpc/methods.rs
Normal file
38
beacon_node/libp2p/src/rpc/methods.rs
Normal file
@ -0,0 +1,38 @@
|
||||
/// Available RPC methods types and ids.
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use types::{Epoch, Hash256, Slot};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum RPCMethod {
|
||||
Hello,
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl From<u16> for RPCMethod {
|
||||
fn from(method_id: u16) -> Self {
|
||||
match method_id {
|
||||
0 => RPCMethod::Hello,
|
||||
_ => RPCMethod::Unknown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RPCRequest {
|
||||
HelloRequest,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RPCResponse {
|
||||
HelloResponse(HelloResponse),
|
||||
}
|
||||
|
||||
// request/response structs for RPC methods
|
||||
#[derive(Encode, Decode, Clone, Debug)]
|
||||
pub struct HelloResponse {
|
||||
pub network_id: u8,
|
||||
pub latest_finalized_root: Hash256,
|
||||
pub latest_finalized_epoch: Epoch,
|
||||
pub best_root: Hash256,
|
||||
pub best_slot: Slot,
|
||||
}
|
120
beacon_node/libp2p/src/rpc/mod.rs
Normal file
120
beacon_node/libp2p/src/rpc/mod.rs
Normal file
@ -0,0 +1,120 @@
|
||||
mod handler;
|
||||
mod methods;
|
||||
/// RPC Protocol over libp2p.
|
||||
///
|
||||
/// This is purpose built for Ethereum 2.0 serenity and the protocol listens on
|
||||
/// `/eth/serenity/rpc/1.0.0`
|
||||
mod protocol;
|
||||
|
||||
use futures::prelude::*;
|
||||
use libp2p::core::protocols_handler::{OneShotHandler, ProtocolsHandler};
|
||||
use libp2p::core::swarm::{
|
||||
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
|
||||
};
|
||||
use libp2p::{Multiaddr, PeerId};
|
||||
use methods::RPCRequest;
|
||||
use protocol::{RPCProtocol, RpcEvent};
|
||||
use std::marker::PhantomData;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
/// The network behaviour handles RPC requests/responses as specified in the Eth 2.0 phase 0
|
||||
/// specification.
|
||||
|
||||
pub struct Rpc<TSubstream> {
|
||||
/// Queue of events to processed.
|
||||
events: Vec<RpcEvent>,
|
||||
/// Pins the generic substream.
|
||||
marker: PhantomData<TSubstream>,
|
||||
}
|
||||
|
||||
impl<TSubstream> Rpc<TSubstream> {
|
||||
pub fn new() -> Self {
|
||||
Rpc {
|
||||
events: Vec::new(),
|
||||
marker: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Submits and RPC request.
|
||||
pub fn send_request(&mut self, id: u64, method_id: u16, body: RPCRequest) {
|
||||
let request = RpcEvent::Request {
|
||||
id,
|
||||
method_id,
|
||||
body,
|
||||
};
|
||||
self.events.push(request);
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> NetworkBehaviour for Rpc<TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type ProtocolsHandler = OneShotHandler<TSubstream, RPCProtocol, RpcEvent, OneShotEvent>;
|
||||
type OutEvent = RpcEvent;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
Default::default()
|
||||
}
|
||||
|
||||
fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {}
|
||||
|
||||
fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}
|
||||
|
||||
fn inject_node_event(
|
||||
&mut self,
|
||||
source: PeerId,
|
||||
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
|
||||
) {
|
||||
// ignore successful sends event
|
||||
let event = match event {
|
||||
OneShotEvent::Rx(event) => event,
|
||||
OneShotEvent::Sent => return,
|
||||
};
|
||||
|
||||
// send the event to the user
|
||||
self.events.push(event);
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
_: &mut PollParameters<'_>,
|
||||
) -> Async<
|
||||
NetworkBehaviourAction<
|
||||
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
|
||||
Self::OutEvent,
|
||||
>,
|
||||
> {
|
||||
if !self.events.is_empty() {
|
||||
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
|
||||
}
|
||||
Async::NotReady
|
||||
}
|
||||
}
|
||||
|
||||
/// Transmission between the `OneShotHandler` and the `RpcEvent`.
|
||||
#[derive(Debug)]
|
||||
pub enum OneShotEvent {
|
||||
/// We received an RPC from a remote.
|
||||
Rx(RpcEvent),
|
||||
/// We successfully sent an RPC request.
|
||||
Sent,
|
||||
}
|
||||
|
||||
impl From<RpcEvent> for OneShotEvent {
|
||||
#[inline]
|
||||
fn from(rpc: RpcEvent) -> OneShotEvent {
|
||||
OneShotEvent::Rx(rpc)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<()> for OneShotEvent {
|
||||
#[inline]
|
||||
fn from(_: ()) -> OneShotEvent {
|
||||
OneShotEvent::Sent
|
||||
}
|
||||
}
|
179
beacon_node/libp2p/src/rpc/protocol.rs
Normal file
179
beacon_node/libp2p/src/rpc/protocol.rs
Normal file
@ -0,0 +1,179 @@
|
||||
use super::methods::HelloResponse;
|
||||
use super::methods::{RPCMethod, RPCRequest, RPCResponse};
|
||||
//use crate::rpc_proto;
|
||||
//use byteorder::{BigEndian, ByteOrder};
|
||||
//use bytes::BytesMut;
|
||||
use futures::{future, stream, Future, Stream};
|
||||
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo};
|
||||
//use std::{io, iter};
|
||||
use ssz::{ssz_encode, Decodable, Encodable, SszStream};
|
||||
use std::io;
|
||||
use std::iter;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
/// The maximum bytes that can be sent across the RPC.
|
||||
const MAX_READ_SIZE: usize = 2048;
|
||||
|
||||
/// Implementation of the `ConnectionUpgrade` for the rpc protocol.
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RPCProtocol;
|
||||
|
||||
impl UpgradeInfo for RPCProtocol {
|
||||
type Info = &'static [u8];
|
||||
type InfoIter = iter::Once<Self::Info>;
|
||||
|
||||
#[inline]
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
iter::once(b"/eth/serenity/rpc/1.0.0")
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RPCProtocol {
|
||||
fn default() -> Self {
|
||||
RPCProtocol
|
||||
}
|
||||
}
|
||||
|
||||
/// The RPC types which are sent/received in this protocol.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RpcEvent {
|
||||
Request {
|
||||
id: u64,
|
||||
method_id: u16,
|
||||
body: RPCRequest,
|
||||
},
|
||||
Response {
|
||||
id: u64,
|
||||
method_id: u16, //TODO: Remove and process decoding upstream
|
||||
result: RPCResponse,
|
||||
},
|
||||
}
|
||||
|
||||
impl UpgradeInfo for RpcEvent {
|
||||
type Info = &'static [u8];
|
||||
type InfoIter = iter::Once<Self::Info>;
|
||||
|
||||
#[inline]
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
iter::once(b"/eth/serenity/rpc/1.0.0")
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSocket> InboundUpgrade<TSocket> for RPCProtocol
|
||||
where
|
||||
TSocket: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Output = RpcEvent;
|
||||
type Error = DecodeError;
|
||||
type Future =
|
||||
upgrade::ReadOneThen<TSocket, (), fn(Vec<u8>, ()) -> Result<RpcEvent, DecodeError>>;
|
||||
|
||||
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
|
||||
upgrade::read_one_then(socket, MAX_READ_SIZE, (), |packet, ()| Ok(decode(packet)?))
|
||||
}
|
||||
}
|
||||
|
||||
fn decode(packet: Vec<u8>) -> Result<RpcEvent, DecodeError> {
|
||||
// decode the header of the rpc
|
||||
// request/response
|
||||
let (request, index) = bool::ssz_decode(&packet, 0)?;
|
||||
let (id, index) = u64::ssz_decode(&packet, index)?;
|
||||
let (method_id, index) = u16::ssz_decode(&packet, index)?;
|
||||
|
||||
if request {
|
||||
let body = match RPCMethod::from(method_id) {
|
||||
RPCMethod::Hello => RPCRequest::HelloRequest,
|
||||
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
|
||||
};
|
||||
|
||||
return Ok(RpcEvent::Request {
|
||||
id,
|
||||
method_id,
|
||||
body,
|
||||
});
|
||||
}
|
||||
// we have received a response
|
||||
else {
|
||||
let result = match RPCMethod::from(method_id) {
|
||||
RPCMethod::Hello => {
|
||||
let (hello_response, _index) = HelloResponse::ssz_decode(&packet, index)?;
|
||||
RPCResponse::HelloResponse(hello_response)
|
||||
}
|
||||
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
|
||||
};
|
||||
return Ok(RpcEvent::Response {
|
||||
id,
|
||||
method_id,
|
||||
result,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSocket> OutboundUpgrade<TSocket> for RpcEvent
|
||||
where
|
||||
TSocket: AsyncWrite,
|
||||
{
|
||||
type Output = ();
|
||||
type Error = io::Error;
|
||||
type Future = upgrade::WriteOne<TSocket>;
|
||||
|
||||
#[inline]
|
||||
fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
|
||||
let bytes = ssz_encode(&self);
|
||||
upgrade::write_one(socket, bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl Encodable for RpcEvent {
|
||||
fn ssz_append(&self, s: &mut SszStream) {
|
||||
match self {
|
||||
RpcEvent::Request {
|
||||
id,
|
||||
method_id,
|
||||
body,
|
||||
} => {
|
||||
s.append(&true);
|
||||
s.append(id);
|
||||
s.append(method_id);
|
||||
match body {
|
||||
RPCRequest::HelloRequest => {}
|
||||
}
|
||||
}
|
||||
RpcEvent::Response {
|
||||
id,
|
||||
method_id,
|
||||
result,
|
||||
} => {
|
||||
s.append(&false);
|
||||
s.append(id);
|
||||
s.append(method_id);
|
||||
match result {
|
||||
RPCResponse::HelloResponse(response) => {
|
||||
s.append(response);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum DecodeError {
|
||||
ReadError(upgrade::ReadOneError),
|
||||
SSZDecodeError(ssz::DecodeError),
|
||||
UnknownRPCMethod,
|
||||
}
|
||||
|
||||
impl From<upgrade::ReadOneError> for DecodeError {
|
||||
#[inline]
|
||||
fn from(err: upgrade::ReadOneError) -> Self {
|
||||
DecodeError::ReadError(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ssz::DecodeError> for DecodeError {
|
||||
#[inline]
|
||||
fn from(err: ssz::DecodeError) -> Self {
|
||||
DecodeError::SSZDecodeError(err)
|
||||
}
|
||||
}
|
@ -15,7 +15,7 @@ use libp2p::{PeerId, Swarm};
|
||||
use slog::{debug, info, trace, warn};
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::time::Duration;
|
||||
use types::{Topic, TopicBuilder};
|
||||
use types::TopicBuilder;
|
||||
|
||||
/// The configuration and state of the libp2p components for the beacon node.
|
||||
pub struct Service {
|
||||
|
Loading…
Reference in New Issue
Block a user