lighthouse/beacon_node/eth2-libp2p/src/rpc/protocol.rs

223 lines
6.7 KiB
Rust
Raw Normal View History

2019-03-21 00:02:52 +00:00
use super::methods::*;
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
2019-03-14 14:50:59 +00:00
use ssz::{ssz_encode, Decodable, Encodable, SszStream};
use std::io;
use std::iter;
use tokio::io::{AsyncRead, AsyncWrite};
/// The maximum bytes that can be sent across the RPC.
2019-03-20 23:43:21 +00:00
const MAX_READ_SIZE: usize = 4_194_304; // 4M
2019-03-14 14:50:59 +00:00
/// Implementation of the `ConnectionUpgrade` for the rpc protocol.
#[derive(Debug, Clone)]
pub struct RPCProtocol;
impl UpgradeInfo for RPCProtocol {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/eth/serenity/rpc/1.0.0")
}
}
impl Default for RPCProtocol {
fn default() -> Self {
RPCProtocol
}
}
/// The RPC types which are sent/received in this protocol.
#[derive(Debug, Clone)]
pub enum RPCEvent {
2019-03-14 14:50:59 +00:00
Request {
id: u64,
method_id: u16,
body: RPCRequest,
},
Response {
id: u64,
method_id: u16, //TODO: Remove and process decoding upstream
result: RPCResponse,
},
}
impl UpgradeInfo for RPCEvent {
2019-03-14 14:50:59 +00:00
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/eth/serenity/rpc/1.0.0")
}
}
impl<TSocket> InboundUpgrade<TSocket> for RPCProtocol
where
TSocket: AsyncRead + AsyncWrite,
{
type Output = RPCEvent;
2019-03-14 14:50:59 +00:00
type Error = DecodeError;
type Future =
upgrade::ReadOneThen<TSocket, (), fn(Vec<u8>, ()) -> Result<RPCEvent, DecodeError>>;
2019-03-14 14:50:59 +00:00
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
upgrade::read_one_then(socket, MAX_READ_SIZE, (), |packet, ()| Ok(decode(packet)?))
}
}
fn decode(packet: Vec<u8>) -> Result<RPCEvent, DecodeError> {
2019-03-14 14:50:59 +00:00
// decode the header of the rpc
// request/response
let (request, index) = bool::ssz_decode(&packet, 0)?;
let (id, index) = u64::ssz_decode(&packet, index)?;
let (method_id, index) = u16::ssz_decode(&packet, index)?;
if request {
let body = match RPCMethod::from(method_id) {
RPCMethod::Hello => {
let (hello_body, _index) = HelloMessage::ssz_decode(&packet, index)?;
RPCRequest::Hello(hello_body)
}
2019-03-20 23:53:29 +00:00
RPCMethod::Goodbye => {
let (goodbye_code, _index) = u64::ssz_decode(&packet, index)?;
RPCRequest::Goodbye(goodbye_code)
}
2019-03-21 00:02:52 +00:00
RPCMethod::BeaconBlockRoots => {
let (block_roots_request, _index) =
BeaconBlockRootsRequest::ssz_decode(&packet, index)?;
RPCRequest::BeaconBlockRoots(block_roots_request)
}
RPCMethod::BeaconBlockHeaders => {
let (block_headers_request, _index) =
BeaconBlockHeadersRequest::ssz_decode(&packet, index)?;
RPCRequest::BeaconBlockHeaders(block_headers_request)
}
2019-03-21 00:16:09 +00:00
RPCMethod::BeaconBlockBodies => {
let (block_bodies_request, _index) =
BeaconBlockBodiesRequest::ssz_decode(&packet, index)?;
RPCRequest::BeaconBlockBodies(block_bodies_request)
}
2019-03-21 00:18:47 +00:00
RPCMethod::BeaconChainState => {
let (chain_state_request, _index) =
BeaconChainStateRequest::ssz_decode(&packet, index)?;
RPCRequest::BeaconChainState(chain_state_request)
}
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
2019-03-14 14:50:59 +00:00
};
2019-03-19 12:20:39 +00:00
Ok(RPCEvent::Request {
2019-03-14 14:50:59 +00:00
id,
method_id,
body,
2019-03-19 12:20:39 +00:00
})
2019-03-14 14:50:59 +00:00
}
// we have received a response
else {
let result = match RPCMethod::from(method_id) {
RPCMethod::Hello => {
let (body, _index) = HelloMessage::ssz_decode(&packet, index)?;
RPCResponse::Hello(body)
2019-03-14 14:50:59 +00:00
}
2019-03-21 00:02:52 +00:00
RPCMethod::Goodbye => unreachable!("Should never receive a goodbye response"),
RPCMethod::BeaconBlockRoots => {
let (body, _index) = BeaconBlockRootsResponse::ssz_decode(&packet, index)?;
RPCResponse::BeaconBlockRoots(body)
}
RPCMethod::BeaconBlockHeaders => {
let (body, _index) = BeaconBlockHeadersResponse::ssz_decode(&packet, index)?;
RPCResponse::BeaconBlockHeaders(body)
}
2019-03-21 00:16:09 +00:00
RPCMethod::BeaconBlockBodies => {
let (body, _index) = BeaconBlockBodiesResponse::ssz_decode(&packet, index)?;
RPCResponse::BeaconBlockBodies(body)
}
2019-03-21 00:18:47 +00:00
RPCMethod::BeaconChainState => {
let (body, _index) = BeaconChainStateResponse::ssz_decode(&packet, index)?;
RPCResponse::BeaconChainState(body)
}
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
2019-03-14 14:50:59 +00:00
};
2019-03-19 12:20:39 +00:00
Ok(RPCEvent::Response {
2019-03-14 14:50:59 +00:00
id,
method_id,
result,
2019-03-19 12:20:39 +00:00
})
2019-03-14 14:50:59 +00:00
}
}
impl<TSocket> OutboundUpgrade<TSocket> for RPCEvent
2019-03-14 14:50:59 +00:00
where
TSocket: AsyncWrite,
{
type Output = ();
type Error = io::Error;
type Future = upgrade::WriteOne<TSocket>;
#[inline]
fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
let bytes = ssz_encode(&self);
upgrade::write_one(socket, bytes)
}
}
impl Encodable for RPCEvent {
2019-03-14 14:50:59 +00:00
fn ssz_append(&self, s: &mut SszStream) {
match self {
RPCEvent::Request {
2019-03-14 14:50:59 +00:00
id,
method_id,
body,
} => {
s.append(&true);
s.append(id);
s.append(method_id);
match body {
RPCRequest::Hello(body) => {
s.append(body);
}
_ => {}
}
2019-03-14 14:50:59 +00:00
}
RPCEvent::Response {
2019-03-14 14:50:59 +00:00
id,
method_id,
result,
} => {
s.append(&false);
s.append(id);
s.append(method_id);
match result {
RPCResponse::Hello(response) => {
2019-03-14 14:50:59 +00:00
s.append(response);
}
_ => {}
2019-03-14 14:50:59 +00:00
}
}
}
}
}
#[derive(Debug)]
2019-03-14 14:50:59 +00:00
pub enum DecodeError {
ReadError(upgrade::ReadOneError),
SSZDecodeError(ssz::DecodeError),
UnknownRPCMethod,
}
impl From<upgrade::ReadOneError> for DecodeError {
#[inline]
fn from(err: upgrade::ReadOneError) -> Self {
DecodeError::ReadError(err)
}
}
impl From<ssz::DecodeError> for DecodeError {
#[inline]
fn from(err: ssz::DecodeError) -> Self {
DecodeError::SSZDecodeError(err)
}
}