commit
b5af73d056
@ -8,7 +8,7 @@ use tokio::runtime::TaskExecutor;
|
|||||||
use tokio::timer::Interval;
|
use tokio::timer::Interval;
|
||||||
|
|
||||||
/// The interval between heartbeat events.
|
/// The interval between heartbeat events.
|
||||||
pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 5;
|
pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 15;
|
||||||
|
|
||||||
/// Spawns a thread that can be used to run code periodically, on `HEARTBEAT_INTERVAL_SECONDS`
|
/// Spawns a thread that can be used to run code periodically, on `HEARTBEAT_INTERVAL_SECONDS`
|
||||||
/// durations.
|
/// durations.
|
||||||
@ -25,19 +25,22 @@ pub fn run<T: BeaconChainTypes + Send + Sync + 'static>(
|
|||||||
Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS),
|
Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS),
|
||||||
);
|
);
|
||||||
|
|
||||||
let _log = client.log.new(o!("Service" => "Notifier"));
|
let log = client.log.new(o!("Service" => "Notifier"));
|
||||||
|
|
||||||
|
let libp2p = client.network.libp2p_service();
|
||||||
|
|
||||||
|
let heartbeat = move |_| {
|
||||||
|
// Notify the number of connected nodes
|
||||||
|
// Panic if libp2p is poisoned
|
||||||
|
debug!(log, ""; "Connected Peers" => libp2p.lock().swarm.connected_peers());
|
||||||
|
|
||||||
let heartbeat = |_| {
|
|
||||||
// There is not presently any heartbeat logic.
|
|
||||||
//
|
|
||||||
// We leave this function empty for future use.
|
|
||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
|
|
||||||
// map error and spawn
|
// map error and spawn
|
||||||
let log = client.log.clone();
|
let err_log = client.log.clone();
|
||||||
let heartbeat_interval = interval
|
let heartbeat_interval = interval
|
||||||
.map_err(move |e| debug!(log, "Timer error {}", e))
|
.map_err(move |e| debug!(err_log, "Timer error {}", e))
|
||||||
.for_each(heartbeat);
|
.for_each(heartbeat);
|
||||||
|
|
||||||
executor.spawn(exit.until(heartbeat_interval).map(|_| ()));
|
executor.spawn(exit.until(heartbeat_interval).map(|_| ()));
|
||||||
|
@ -22,3 +22,8 @@ futures = "0.1.25"
|
|||||||
error-chain = "0.12.0"
|
error-chain = "0.12.0"
|
||||||
tokio-timer = "0.2.10"
|
tokio-timer = "0.2.10"
|
||||||
dirs = "2.0.1"
|
dirs = "2.0.1"
|
||||||
|
tokio-io = "0.1.12"
|
||||||
|
smallvec = "0.6.10"
|
||||||
|
fnv = "1.0.6"
|
||||||
|
unsigned-varint = "0.2.2"
|
||||||
|
bytes = "0.4.12"
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use crate::discovery::Discovery;
|
use crate::discovery::Discovery;
|
||||||
use crate::rpc::{RPCEvent, RPCMessage, Rpc};
|
use crate::rpc::{RPCEvent, RPCMessage, RPC};
|
||||||
use crate::{error, NetworkConfig};
|
use crate::{error, NetworkConfig};
|
||||||
use crate::{Topic, TopicHash};
|
use crate::{Topic, TopicHash};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
@ -29,7 +29,7 @@ pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
|
|||||||
/// The routing pub-sub mechanism for eth2.
|
/// The routing pub-sub mechanism for eth2.
|
||||||
gossipsub: Gossipsub<TSubstream>,
|
gossipsub: Gossipsub<TSubstream>,
|
||||||
/// The serenity RPC specified in the wire-0 protocol.
|
/// The serenity RPC specified in the wire-0 protocol.
|
||||||
serenity_rpc: Rpc<TSubstream>,
|
serenity_rpc: RPC<TSubstream>,
|
||||||
/// Keep regular connection to peers and disconnect if absent.
|
/// Keep regular connection to peers and disconnect if absent.
|
||||||
ping: Ping<TSubstream>,
|
ping: Ping<TSubstream>,
|
||||||
/// Kademlia for peer discovery.
|
/// Kademlia for peer discovery.
|
||||||
@ -57,7 +57,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
|
|||||||
.with_keep_alive(false);
|
.with_keep_alive(false);
|
||||||
|
|
||||||
Ok(Behaviour {
|
Ok(Behaviour {
|
||||||
serenity_rpc: Rpc::new(log),
|
serenity_rpc: RPC::new(log),
|
||||||
gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()),
|
gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()),
|
||||||
discovery: Discovery::new(local_key, net_conf, log)?,
|
discovery: Discovery::new(local_key, net_conf, log)?,
|
||||||
ping: Ping::new(ping_config),
|
ping: Ping::new(ping_config),
|
||||||
@ -109,6 +109,9 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RPCMessage
|
|||||||
RPCMessage::PeerDialed(peer_id) => {
|
RPCMessage::PeerDialed(peer_id) => {
|
||||||
self.events.push(BehaviourEvent::PeerDialed(peer_id))
|
self.events.push(BehaviourEvent::PeerDialed(peer_id))
|
||||||
}
|
}
|
||||||
|
RPCMessage::PeerDisconnected(peer_id) => {
|
||||||
|
self.events.push(BehaviourEvent::PeerDisconnected(peer_id))
|
||||||
|
}
|
||||||
RPCMessage::RPC(peer_id, rpc_event) => {
|
RPCMessage::RPC(peer_id, rpc_event) => {
|
||||||
self.events.push(BehaviourEvent::RPC(peer_id, rpc_event))
|
self.events.push(BehaviourEvent::RPC(peer_id, rpc_event))
|
||||||
}
|
}
|
||||||
@ -168,12 +171,18 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
|
|||||||
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
|
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
|
||||||
self.serenity_rpc.send_rpc(peer_id, rpc_event);
|
self.serenity_rpc.send_rpc(peer_id, rpc_event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Discovery / Peer management functions */
|
||||||
|
pub fn connected_peers(&self) -> usize {
|
||||||
|
self.discovery.connected_peers()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The types of events than can be obtained from polling the behaviour.
|
/// The types of events than can be obtained from polling the behaviour.
|
||||||
pub enum BehaviourEvent {
|
pub enum BehaviourEvent {
|
||||||
RPC(PeerId, RPCEvent),
|
RPC(PeerId, RPCEvent),
|
||||||
PeerDialed(PeerId),
|
PeerDialed(PeerId),
|
||||||
|
PeerDisconnected(PeerId),
|
||||||
GossipMessage {
|
GossipMessage {
|
||||||
source: PeerId,
|
source: PeerId,
|
||||||
topics: Vec<TopicHash>,
|
topics: Vec<TopicHash>,
|
||||||
|
@ -77,6 +77,7 @@ impl<TSubstream> Discovery<TSubstream> {
|
|||||||
|
|
||||||
info!(log, "Local ENR: {}", local_enr.to_base64());
|
info!(log, "Local ENR: {}", local_enr.to_base64());
|
||||||
debug!(log, "Local Node Id: {}", local_enr.node_id());
|
debug!(log, "Local Node Id: {}", local_enr.node_id());
|
||||||
|
debug!(log, "Local ENR seq: {}", local_enr.seq());
|
||||||
|
|
||||||
let mut discovery = Discv5::new(local_enr, local_key.clone(), config.listen_address)
|
let mut discovery = Discv5::new(local_enr, local_key.clone(), config.listen_address)
|
||||||
.map_err(|e| format!("Discv5 service failed: {:?}", e))?;
|
.map_err(|e| format!("Discv5 service failed: {:?}", e))?;
|
||||||
@ -115,6 +116,11 @@ impl<TSubstream> Discovery<TSubstream> {
|
|||||||
self.discovery.add_enr(enr);
|
self.discovery.add_enr(enr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The current number of connected libp2p peers.
|
||||||
|
pub fn connected_peers(&self) -> usize {
|
||||||
|
self.connected_peers.len()
|
||||||
|
}
|
||||||
|
|
||||||
/// Search for new peers using the underlying discovery mechanism.
|
/// Search for new peers using the underlying discovery mechanism.
|
||||||
fn find_peers(&mut self) {
|
fn find_peers(&mut self) {
|
||||||
// pick a random NodeId
|
// pick a random NodeId
|
||||||
|
135
beacon_node/eth2-libp2p/src/rpc/codec/base.rs
Normal file
135
beacon_node/eth2-libp2p/src/rpc/codec/base.rs
Normal file
@ -0,0 +1,135 @@
|
|||||||
|
//! This handles the various supported encoding mechanism for the Eth 2.0 RPC.
|
||||||
|
|
||||||
|
use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse};
|
||||||
|
use bytes::BufMut;
|
||||||
|
use bytes::BytesMut;
|
||||||
|
use tokio::codec::{Decoder, Encoder};
|
||||||
|
|
||||||
|
pub trait OutboundCodec: Encoder + Decoder {
|
||||||
|
type ErrorType;
|
||||||
|
|
||||||
|
fn decode_error(
|
||||||
|
&mut self,
|
||||||
|
src: &mut BytesMut,
|
||||||
|
) -> Result<Option<Self::ErrorType>, <Self as Decoder>::Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct BaseInboundCodec<TCodec>
|
||||||
|
where
|
||||||
|
TCodec: Encoder + Decoder,
|
||||||
|
{
|
||||||
|
/// Inner codec for handling various encodings
|
||||||
|
inner: TCodec,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TCodec> BaseInboundCodec<TCodec>
|
||||||
|
where
|
||||||
|
TCodec: Encoder + Decoder,
|
||||||
|
{
|
||||||
|
pub fn new(codec: TCodec) -> Self {
|
||||||
|
BaseInboundCodec { inner: codec }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct BaseOutboundCodec<TOutboundCodec>
|
||||||
|
where
|
||||||
|
TOutboundCodec: OutboundCodec,
|
||||||
|
{
|
||||||
|
/// Inner codec for handling various encodings
|
||||||
|
inner: TOutboundCodec,
|
||||||
|
/// Optimisation for decoding. True if the response code has been read and we are awaiting a
|
||||||
|
/// response.
|
||||||
|
response_code: Option<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TOutboundCodec> BaseOutboundCodec<TOutboundCodec>
|
||||||
|
where
|
||||||
|
TOutboundCodec: OutboundCodec,
|
||||||
|
{
|
||||||
|
pub fn new(codec: TOutboundCodec) -> Self {
|
||||||
|
BaseOutboundCodec {
|
||||||
|
inner: codec,
|
||||||
|
response_code: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TCodec> Encoder for BaseInboundCodec<TCodec>
|
||||||
|
where
|
||||||
|
TCodec: Decoder + Encoder<Item = RPCErrorResponse>,
|
||||||
|
{
|
||||||
|
type Item = RPCErrorResponse;
|
||||||
|
type Error = <TCodec as Encoder>::Error;
|
||||||
|
|
||||||
|
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||||
|
dst.clear();
|
||||||
|
dst.reserve(1);
|
||||||
|
dst.put_u8(item.as_u8());
|
||||||
|
return self.inner.encode(item, dst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TCodec> Decoder for BaseInboundCodec<TCodec>
|
||||||
|
where
|
||||||
|
TCodec: Encoder + Decoder<Item = RPCRequest>,
|
||||||
|
{
|
||||||
|
type Item = RPCRequest;
|
||||||
|
type Error = <TCodec as Decoder>::Error;
|
||||||
|
|
||||||
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
|
self.inner.decode(src)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TCodec> Encoder for BaseOutboundCodec<TCodec>
|
||||||
|
where
|
||||||
|
TCodec: OutboundCodec + Encoder<Item = RPCRequest>,
|
||||||
|
{
|
||||||
|
type Item = RPCRequest;
|
||||||
|
type Error = <TCodec as Encoder>::Error;
|
||||||
|
|
||||||
|
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||||
|
self.inner.encode(item, dst)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TCodec> Decoder for BaseOutboundCodec<TCodec>
|
||||||
|
where
|
||||||
|
TCodec: OutboundCodec<ErrorType = ErrorMessage> + Decoder<Item = RPCResponse>,
|
||||||
|
{
|
||||||
|
type Item = RPCErrorResponse;
|
||||||
|
type Error = <TCodec as Decoder>::Error;
|
||||||
|
|
||||||
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
|
let response_code = {
|
||||||
|
if let Some(resp_code) = self.response_code {
|
||||||
|
resp_code
|
||||||
|
} else {
|
||||||
|
// buffer should not be empty
|
||||||
|
debug_assert!(!src.is_empty());
|
||||||
|
|
||||||
|
let resp_byte = src.split_to(1);
|
||||||
|
let mut resp_code_byte = [0; 1];
|
||||||
|
resp_code_byte.copy_from_slice(&resp_byte);
|
||||||
|
|
||||||
|
let resp_code = u8::from_be_bytes(resp_code_byte);
|
||||||
|
self.response_code = Some(resp_code);
|
||||||
|
resp_code
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if RPCErrorResponse::is_response(response_code) {
|
||||||
|
// decode an actual response
|
||||||
|
return self
|
||||||
|
.inner
|
||||||
|
.decode(src)
|
||||||
|
.map(|r| r.map(|resp| RPCErrorResponse::Success(resp)));
|
||||||
|
} else {
|
||||||
|
// decode an error
|
||||||
|
return self
|
||||||
|
.inner
|
||||||
|
.decode_error(src)
|
||||||
|
.map(|r| r.map(|resp| RPCErrorResponse::from_error(response_code, resp)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
62
beacon_node/eth2-libp2p/src/rpc/codec/mod.rs
Normal file
62
beacon_node/eth2-libp2p/src/rpc/codec/mod.rs
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
pub(crate) mod base;
|
||||||
|
pub(crate) mod ssz;
|
||||||
|
|
||||||
|
use self::base::{BaseInboundCodec, BaseOutboundCodec};
|
||||||
|
use self::ssz::{SSZInboundCodec, SSZOutboundCodec};
|
||||||
|
use crate::rpc::protocol::RPCError;
|
||||||
|
use crate::rpc::{RPCErrorResponse, RPCRequest};
|
||||||
|
use bytes::BytesMut;
|
||||||
|
use tokio::codec::{Decoder, Encoder};
|
||||||
|
|
||||||
|
// Known types of codecs
|
||||||
|
pub enum InboundCodec {
|
||||||
|
SSZ(BaseInboundCodec<SSZInboundCodec>),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum OutboundCodec {
|
||||||
|
SSZ(BaseOutboundCodec<SSZOutboundCodec>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Encoder for InboundCodec {
|
||||||
|
type Item = RPCErrorResponse;
|
||||||
|
type Error = RPCError;
|
||||||
|
|
||||||
|
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||||
|
match self {
|
||||||
|
InboundCodec::SSZ(codec) => codec.encode(item, dst),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Decoder for InboundCodec {
|
||||||
|
type Item = RPCRequest;
|
||||||
|
type Error = RPCError;
|
||||||
|
|
||||||
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
|
match self {
|
||||||
|
InboundCodec::SSZ(codec) => codec.decode(src),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Encoder for OutboundCodec {
|
||||||
|
type Item = RPCRequest;
|
||||||
|
type Error = RPCError;
|
||||||
|
|
||||||
|
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||||
|
match self {
|
||||||
|
OutboundCodec::SSZ(codec) => codec.encode(item, dst),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Decoder for OutboundCodec {
|
||||||
|
type Item = RPCErrorResponse;
|
||||||
|
type Error = RPCError;
|
||||||
|
|
||||||
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
|
match self {
|
||||||
|
OutboundCodec::SSZ(codec) => codec.decode(src),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
242
beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs
Normal file
242
beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs
Normal file
@ -0,0 +1,242 @@
|
|||||||
|
use crate::rpc::methods::*;
|
||||||
|
use crate::rpc::{
|
||||||
|
codec::base::OutboundCodec,
|
||||||
|
protocol::{ProtocolId, RPCError},
|
||||||
|
};
|
||||||
|
use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse};
|
||||||
|
use bytes::{Bytes, BytesMut};
|
||||||
|
use ssz::{Decode, Encode};
|
||||||
|
use tokio::codec::{Decoder, Encoder};
|
||||||
|
use unsigned_varint::codec::UviBytes;
|
||||||
|
|
||||||
|
/* Inbound Codec */
|
||||||
|
|
||||||
|
pub struct SSZInboundCodec {
|
||||||
|
inner: UviBytes,
|
||||||
|
protocol: ProtocolId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SSZInboundCodec {
|
||||||
|
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
|
||||||
|
let mut uvi_codec = UviBytes::default();
|
||||||
|
uvi_codec.set_max_len(max_packet_size);
|
||||||
|
|
||||||
|
// this encoding only applies to ssz.
|
||||||
|
debug_assert!(protocol.encoding.as_str() == "ssz");
|
||||||
|
|
||||||
|
SSZInboundCodec {
|
||||||
|
inner: uvi_codec,
|
||||||
|
protocol,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encoder for inbound
|
||||||
|
impl Encoder for SSZInboundCodec {
|
||||||
|
type Item = RPCErrorResponse;
|
||||||
|
type Error = RPCError;
|
||||||
|
|
||||||
|
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||||
|
let bytes = match item {
|
||||||
|
RPCErrorResponse::Success(resp) => {
|
||||||
|
match resp {
|
||||||
|
RPCResponse::Hello(res) => res.as_ssz_bytes(),
|
||||||
|
RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(),
|
||||||
|
RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes
|
||||||
|
RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes
|
||||||
|
RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(),
|
||||||
|
RPCErrorResponse::ServerError(err) => err.as_ssz_bytes(),
|
||||||
|
RPCErrorResponse::Unknown(err) => err.as_ssz_bytes(),
|
||||||
|
};
|
||||||
|
|
||||||
|
if !bytes.is_empty() {
|
||||||
|
// length-prefix and return
|
||||||
|
return self
|
||||||
|
.inner
|
||||||
|
.encode(Bytes::from(bytes), dst)
|
||||||
|
.map_err(RPCError::from);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decoder for inbound
|
||||||
|
impl Decoder for SSZInboundCodec {
|
||||||
|
type Item = RPCRequest;
|
||||||
|
type Error = RPCError;
|
||||||
|
|
||||||
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
|
match self.inner.decode(src).map_err(RPCError::from) {
|
||||||
|
Ok(Some(packet)) => match self.protocol.message_name.as_str() {
|
||||||
|
"hello" => match self.protocol.version.as_str() {
|
||||||
|
"1.0.0" => Ok(Some(RPCRequest::Hello(HelloMessage::from_ssz_bytes(
|
||||||
|
&packet,
|
||||||
|
)?))),
|
||||||
|
_ => Err(RPCError::InvalidProtocol("Unknown HELLO version")),
|
||||||
|
},
|
||||||
|
"goodbye" => match self.protocol.version.as_str() {
|
||||||
|
"1.0.0" => Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes(
|
||||||
|
&packet,
|
||||||
|
)?))),
|
||||||
|
_ => Err(RPCError::InvalidProtocol(
|
||||||
|
"Unknown GOODBYE version.as_str()",
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
"beacon_block_roots" => match self.protocol.version.as_str() {
|
||||||
|
"1.0.0" => Ok(Some(RPCRequest::BeaconBlockRoots(
|
||||||
|
BeaconBlockRootsRequest::from_ssz_bytes(&packet)?,
|
||||||
|
))),
|
||||||
|
_ => Err(RPCError::InvalidProtocol(
|
||||||
|
"Unknown BEACON_BLOCK_ROOTS version.",
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
"beacon_block_headers" => match self.protocol.version.as_str() {
|
||||||
|
"1.0.0" => Ok(Some(RPCRequest::BeaconBlockHeaders(
|
||||||
|
BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?,
|
||||||
|
))),
|
||||||
|
_ => Err(RPCError::InvalidProtocol(
|
||||||
|
"Unknown BEACON_BLOCK_HEADERS version.",
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
"beacon_block_bodies" => match self.protocol.version.as_str() {
|
||||||
|
"1.0.0" => Ok(Some(RPCRequest::BeaconBlockBodies(
|
||||||
|
BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?,
|
||||||
|
))),
|
||||||
|
_ => Err(RPCError::InvalidProtocol(
|
||||||
|
"Unknown BEACON_BLOCK_BODIES version.",
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
"beacon_chain_state" => match self.protocol.version.as_str() {
|
||||||
|
"1.0.0" => Ok(Some(RPCRequest::BeaconChainState(
|
||||||
|
BeaconChainStateRequest::from_ssz_bytes(&packet)?,
|
||||||
|
))),
|
||||||
|
_ => Err(RPCError::InvalidProtocol(
|
||||||
|
"Unknown BEACON_CHAIN_STATE version.",
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
_ => Err(RPCError::InvalidProtocol("Unknown message name.")),
|
||||||
|
},
|
||||||
|
Ok(None) => Ok(None),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Outbound Codec */
|
||||||
|
|
||||||
|
pub struct SSZOutboundCodec {
|
||||||
|
inner: UviBytes,
|
||||||
|
protocol: ProtocolId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SSZOutboundCodec {
|
||||||
|
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
|
||||||
|
let mut uvi_codec = UviBytes::default();
|
||||||
|
uvi_codec.set_max_len(max_packet_size);
|
||||||
|
|
||||||
|
// this encoding only applies to ssz.
|
||||||
|
debug_assert!(protocol.encoding.as_str() == "ssz");
|
||||||
|
|
||||||
|
SSZOutboundCodec {
|
||||||
|
inner: uvi_codec,
|
||||||
|
protocol,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encoder for outbound
|
||||||
|
impl Encoder for SSZOutboundCodec {
|
||||||
|
type Item = RPCRequest;
|
||||||
|
type Error = RPCError;
|
||||||
|
|
||||||
|
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||||
|
let bytes = match item {
|
||||||
|
RPCRequest::Hello(req) => req.as_ssz_bytes(),
|
||||||
|
RPCRequest::Goodbye(req) => req.as_ssz_bytes(),
|
||||||
|
RPCRequest::BeaconBlockRoots(req) => req.as_ssz_bytes(),
|
||||||
|
RPCRequest::BeaconBlockHeaders(req) => req.as_ssz_bytes(),
|
||||||
|
RPCRequest::BeaconBlockBodies(req) => req.as_ssz_bytes(),
|
||||||
|
RPCRequest::BeaconChainState(req) => req.as_ssz_bytes(),
|
||||||
|
};
|
||||||
|
// length-prefix
|
||||||
|
self.inner
|
||||||
|
.encode(bytes::Bytes::from(bytes), dst)
|
||||||
|
.map_err(RPCError::from)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decoder for outbound
|
||||||
|
impl Decoder for SSZOutboundCodec {
|
||||||
|
type Item = RPCResponse;
|
||||||
|
type Error = RPCError;
|
||||||
|
|
||||||
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
|
match self.inner.decode(src).map_err(RPCError::from) {
|
||||||
|
Ok(Some(packet)) => match self.protocol.message_name.as_str() {
|
||||||
|
"hello" => match self.protocol.version.as_str() {
|
||||||
|
"1.0.0" => Ok(Some(RPCResponse::Hello(HelloMessage::from_ssz_bytes(
|
||||||
|
&packet,
|
||||||
|
)?))),
|
||||||
|
_ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")),
|
||||||
|
},
|
||||||
|
"goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")),
|
||||||
|
"beacon_block_roots" => match self.protocol.version.as_str() {
|
||||||
|
"1.0.0" => Ok(Some(RPCResponse::BeaconBlockRoots(
|
||||||
|
BeaconBlockRootsResponse::from_ssz_bytes(&packet)?,
|
||||||
|
))),
|
||||||
|
_ => Err(RPCError::InvalidProtocol(
|
||||||
|
"Unknown BEACON_BLOCK_ROOTS version.",
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
"beacon_block_headers" => match self.protocol.version.as_str() {
|
||||||
|
"1.0.0" => Ok(Some(RPCResponse::BeaconBlockHeaders(
|
||||||
|
BeaconBlockHeadersResponse {
|
||||||
|
headers: packet.to_vec(),
|
||||||
|
},
|
||||||
|
))),
|
||||||
|
_ => Err(RPCError::InvalidProtocol(
|
||||||
|
"Unknown BEACON_BLOCK_HEADERS version.",
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
"beacon_block_bodies" => match self.protocol.version.as_str() {
|
||||||
|
"1.0.0" => Ok(Some(RPCResponse::BeaconBlockBodies(
|
||||||
|
BeaconBlockBodiesResponse {
|
||||||
|
block_bodies: packet.to_vec(),
|
||||||
|
// this gets filled in the protocol handler
|
||||||
|
block_roots: None,
|
||||||
|
},
|
||||||
|
))),
|
||||||
|
_ => Err(RPCError::InvalidProtocol(
|
||||||
|
"Unknown BEACON_BLOCK_BODIES version.",
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
"beacon_chain_state" => match self.protocol.version.as_str() {
|
||||||
|
"1.0.0" => Ok(Some(RPCResponse::BeaconChainState(
|
||||||
|
BeaconChainStateResponse::from_ssz_bytes(&packet)?,
|
||||||
|
))),
|
||||||
|
_ => Err(RPCError::InvalidProtocol(
|
||||||
|
"Unknown BEACON_CHAIN_STATE version.",
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
_ => Err(RPCError::InvalidProtocol("Unknown method")),
|
||||||
|
},
|
||||||
|
Ok(None) => Ok(None),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OutboundCodec for SSZOutboundCodec {
|
||||||
|
type ErrorType = ErrorMessage;
|
||||||
|
|
||||||
|
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {
|
||||||
|
match self.inner.decode(src).map_err(RPCError::from) {
|
||||||
|
Ok(Some(packet)) => Ok(Some(ErrorMessage::from_ssz_bytes(&packet)?)),
|
||||||
|
Ok(None) => Ok(None),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
391
beacon_node/eth2-libp2p/src/rpc/handler.rs
Normal file
391
beacon_node/eth2-libp2p/src/rpc/handler.rs
Normal file
@ -0,0 +1,391 @@
|
|||||||
|
use super::methods::{RPCErrorResponse, RPCResponse, RequestId};
|
||||||
|
use super::protocol::{RPCError, RPCProtocol, RPCRequest};
|
||||||
|
use super::RPCEvent;
|
||||||
|
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
|
||||||
|
use fnv::FnvHashMap;
|
||||||
|
use futures::prelude::*;
|
||||||
|
use libp2p::core::protocols_handler::{
|
||||||
|
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
|
||||||
|
};
|
||||||
|
use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade};
|
||||||
|
use smallvec::SmallVec;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
|
/// The time (in seconds) before a substream that is awaiting a response times out.
|
||||||
|
pub const RESPONSE_TIMEOUT: u64 = 9;
|
||||||
|
|
||||||
|
/// Implementation of `ProtocolsHandler` for the RPC protocol.
|
||||||
|
pub struct RPCHandler<TSubstream>
|
||||||
|
where
|
||||||
|
TSubstream: AsyncRead + AsyncWrite,
|
||||||
|
{
|
||||||
|
/// The upgrade for inbound substreams.
|
||||||
|
listen_protocol: SubstreamProtocol<RPCProtocol>,
|
||||||
|
|
||||||
|
/// If `Some`, something bad happened and we should shut down the handler with an error.
|
||||||
|
pending_error: Option<ProtocolsHandlerUpgrErr<RPCError>>,
|
||||||
|
|
||||||
|
/// Queue of events to produce in `poll()`.
|
||||||
|
events_out: SmallVec<[RPCEvent; 4]>,
|
||||||
|
|
||||||
|
/// Queue of outbound substreams to open.
|
||||||
|
dial_queue: SmallVec<[RPCEvent; 4]>,
|
||||||
|
|
||||||
|
/// Current number of concurrent outbound substreams being opened.
|
||||||
|
dial_negotiated: u32,
|
||||||
|
|
||||||
|
/// Map of current substreams awaiting a response to an RPC request.
|
||||||
|
waiting_substreams: FnvHashMap<RequestId, WaitingResponse<TSubstream>>,
|
||||||
|
|
||||||
|
/// List of outbound substreams that need to be driven to completion.
|
||||||
|
substreams: Vec<SubstreamState<TSubstream>>,
|
||||||
|
|
||||||
|
/// Sequential Id for waiting substreams.
|
||||||
|
current_substream_id: RequestId,
|
||||||
|
|
||||||
|
/// Maximum number of concurrent outbound substreams being opened. Value is never modified.
|
||||||
|
max_dial_negotiated: u32,
|
||||||
|
|
||||||
|
/// Value to return from `connection_keep_alive`.
|
||||||
|
keep_alive: KeepAlive,
|
||||||
|
|
||||||
|
/// After the given duration has elapsed, an inactive connection will shutdown.
|
||||||
|
inactive_timeout: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An outbound substream is waiting a response from the user.
|
||||||
|
struct WaitingResponse<TSubstream> {
|
||||||
|
/// The framed negotiated substream.
|
||||||
|
substream: InboundFramed<TSubstream>,
|
||||||
|
/// The time when the substream is closed.
|
||||||
|
timeout: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State of an outbound substream. Either waiting for a response, or in the process of sending.
|
||||||
|
pub enum SubstreamState<TSubstream>
|
||||||
|
where
|
||||||
|
TSubstream: AsyncRead + AsyncWrite,
|
||||||
|
{
|
||||||
|
/// A response has been sent, pending writing and flush.
|
||||||
|
ResponsePendingSend {
|
||||||
|
substream: futures::sink::Send<InboundFramed<TSubstream>>,
|
||||||
|
},
|
||||||
|
/// A request has been sent, and we are awaiting a response. This future is driven in the
|
||||||
|
/// handler because GOODBYE requests can be handled and responses dropped instantly.
|
||||||
|
RequestPendingResponse {
|
||||||
|
/// The framed negotiated substream.
|
||||||
|
substream: OutboundFramed<TSubstream>,
|
||||||
|
/// Keeps track of the request id and the request to permit forming advanced responses which require
|
||||||
|
/// data from the request.
|
||||||
|
rpc_event: RPCEvent,
|
||||||
|
/// The time when the substream is closed.
|
||||||
|
timeout: Instant,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TSubstream> RPCHandler<TSubstream>
|
||||||
|
where
|
||||||
|
TSubstream: AsyncRead + AsyncWrite,
|
||||||
|
{
|
||||||
|
pub fn new(
|
||||||
|
listen_protocol: SubstreamProtocol<RPCProtocol>,
|
||||||
|
inactive_timeout: Duration,
|
||||||
|
) -> Self {
|
||||||
|
RPCHandler {
|
||||||
|
listen_protocol,
|
||||||
|
pending_error: None,
|
||||||
|
events_out: SmallVec::new(),
|
||||||
|
dial_queue: SmallVec::new(),
|
||||||
|
dial_negotiated: 0,
|
||||||
|
waiting_substreams: FnvHashMap::default(),
|
||||||
|
substreams: Vec::new(),
|
||||||
|
current_substream_id: 1,
|
||||||
|
max_dial_negotiated: 8,
|
||||||
|
keep_alive: KeepAlive::Yes,
|
||||||
|
inactive_timeout,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the number of pending requests.
|
||||||
|
pub fn pending_requests(&self) -> u32 {
|
||||||
|
self.dial_negotiated + self.dial_queue.len() as u32
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a reference to the listen protocol configuration.
|
||||||
|
///
|
||||||
|
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
|
||||||
|
/// > substreams, not the ones already being negotiated.
|
||||||
|
pub fn listen_protocol_ref(&self) -> &SubstreamProtocol<RPCProtocol> {
|
||||||
|
&self.listen_protocol
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a mutable reference to the listen protocol configuration.
|
||||||
|
///
|
||||||
|
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
|
||||||
|
/// > substreams, not the ones already being negotiated.
|
||||||
|
pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol<RPCProtocol> {
|
||||||
|
&mut self.listen_protocol
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Opens an outbound substream with a request.
|
||||||
|
#[inline]
|
||||||
|
pub fn send_request(&mut self, rpc_event: RPCEvent) {
|
||||||
|
self.keep_alive = KeepAlive::Yes;
|
||||||
|
|
||||||
|
self.dial_queue.push(rpc_event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TSubstream> Default for RPCHandler<TSubstream>
|
||||||
|
where
|
||||||
|
TSubstream: AsyncRead + AsyncWrite,
|
||||||
|
{
|
||||||
|
fn default() -> Self {
|
||||||
|
RPCHandler::new(SubstreamProtocol::new(RPCProtocol), Duration::from_secs(30))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TSubstream> ProtocolsHandler for RPCHandler<TSubstream>
|
||||||
|
where
|
||||||
|
TSubstream: AsyncRead + AsyncWrite,
|
||||||
|
{
|
||||||
|
type InEvent = RPCEvent;
|
||||||
|
type OutEvent = RPCEvent;
|
||||||
|
type Error = ProtocolsHandlerUpgrErr<RPCError>;
|
||||||
|
type Substream = TSubstream;
|
||||||
|
type InboundProtocol = RPCProtocol;
|
||||||
|
type OutboundProtocol = RPCRequest;
|
||||||
|
type OutboundOpenInfo = RPCEvent; // Keep track of the id and the request
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
||||||
|
self.listen_protocol.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn inject_fully_negotiated_inbound(
|
||||||
|
&mut self,
|
||||||
|
out: <RPCProtocol as InboundUpgrade<TSubstream>>::Output,
|
||||||
|
) {
|
||||||
|
let (req, substream) = out;
|
||||||
|
// drop the stream and return a 0 id for goodbye "requests"
|
||||||
|
if let r @ RPCRequest::Goodbye(_) = req {
|
||||||
|
self.events_out.push(RPCEvent::Request(0, r));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// New inbound request. Store the stream and tag the output.
|
||||||
|
let awaiting_stream = WaitingResponse {
|
||||||
|
substream,
|
||||||
|
timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT),
|
||||||
|
};
|
||||||
|
self.waiting_substreams
|
||||||
|
.insert(self.current_substream_id, awaiting_stream);
|
||||||
|
|
||||||
|
self.events_out
|
||||||
|
.push(RPCEvent::Request(self.current_substream_id, req));
|
||||||
|
self.current_substream_id += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn inject_fully_negotiated_outbound(
|
||||||
|
&mut self,
|
||||||
|
out: <RPCRequest as OutboundUpgrade<TSubstream>>::Output,
|
||||||
|
rpc_event: Self::OutboundOpenInfo,
|
||||||
|
) {
|
||||||
|
self.dial_negotiated -= 1;
|
||||||
|
|
||||||
|
if self.dial_negotiated == 0
|
||||||
|
&& self.dial_queue.is_empty()
|
||||||
|
&& self.waiting_substreams.is_empty()
|
||||||
|
{
|
||||||
|
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout);
|
||||||
|
} else {
|
||||||
|
self.keep_alive = KeepAlive::Yes;
|
||||||
|
}
|
||||||
|
|
||||||
|
// add the stream to substreams if we expect a response, otherwise drop the stream.
|
||||||
|
if let RPCEvent::Request(id, req) = rpc_event {
|
||||||
|
if req.expect_response() {
|
||||||
|
let awaiting_stream = SubstreamState::RequestPendingResponse {
|
||||||
|
substream: out,
|
||||||
|
rpc_event: RPCEvent::Request(id, req),
|
||||||
|
timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.substreams.push(awaiting_stream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note: If the substream has closed due to inactivity, or the substream is in the
|
||||||
|
// wrong state a response will fail silently.
|
||||||
|
#[inline]
|
||||||
|
fn inject_event(&mut self, rpc_event: Self::InEvent) {
|
||||||
|
match rpc_event {
|
||||||
|
RPCEvent::Request(_, _) => self.send_request(rpc_event),
|
||||||
|
RPCEvent::Response(rpc_id, res) => {
|
||||||
|
// check if the stream matching the response still exists
|
||||||
|
if let Some(waiting_stream) = self.waiting_substreams.remove(&rpc_id) {
|
||||||
|
// only send one response per stream. This must be in the waiting state.
|
||||||
|
self.substreams.push(SubstreamState::ResponsePendingSend {
|
||||||
|
substream: waiting_stream.substream.send(res),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RPCEvent::Error(_, _) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn inject_dial_upgrade_error(
|
||||||
|
&mut self,
|
||||||
|
_: Self::OutboundOpenInfo,
|
||||||
|
error: ProtocolsHandlerUpgrErr<
|
||||||
|
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error,
|
||||||
|
>,
|
||||||
|
) {
|
||||||
|
if self.pending_error.is_none() {
|
||||||
|
self.pending_error = Some(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn connection_keep_alive(&self) -> KeepAlive {
|
||||||
|
self.keep_alive
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
&mut self,
|
||||||
|
) -> Poll<
|
||||||
|
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
|
||||||
|
Self::Error,
|
||||||
|
> {
|
||||||
|
if let Some(err) = self.pending_error.take() {
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
// return any events that need to be reported
|
||||||
|
if !self.events_out.is_empty() {
|
||||||
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
||||||
|
self.events_out.remove(0),
|
||||||
|
)));
|
||||||
|
} else {
|
||||||
|
self.events_out.shrink_to_fit();
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove any streams that have expired
|
||||||
|
self.waiting_substreams.retain(|_k, waiting_stream| {
|
||||||
|
if Instant::now() > waiting_stream.timeout {
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// drive streams that need to be processed
|
||||||
|
for n in (0..self.substreams.len()).rev() {
|
||||||
|
let stream = self.substreams.swap_remove(n);
|
||||||
|
match stream {
|
||||||
|
SubstreamState::ResponsePendingSend { mut substream } => {
|
||||||
|
match substream.poll() {
|
||||||
|
Ok(Async::Ready(_substream)) => {} // sent and flushed
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
self.substreams
|
||||||
|
.push(SubstreamState::ResponsePendingSend { substream });
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
||||||
|
RPCEvent::Error(0, e),
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SubstreamState::RequestPendingResponse {
|
||||||
|
mut substream,
|
||||||
|
rpc_event,
|
||||||
|
timeout,
|
||||||
|
} => match substream.poll() {
|
||||||
|
Ok(Async::Ready(response)) => {
|
||||||
|
if let Some(response) = response {
|
||||||
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
||||||
|
build_response(rpc_event, response),
|
||||||
|
)));
|
||||||
|
} else {
|
||||||
|
// stream closed early
|
||||||
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
||||||
|
RPCEvent::Error(
|
||||||
|
rpc_event.id(),
|
||||||
|
RPCError::Custom("Stream Closed Early".into()),
|
||||||
|
),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
if Instant::now() < timeout {
|
||||||
|
self.substreams
|
||||||
|
.push(SubstreamState::RequestPendingResponse {
|
||||||
|
substream,
|
||||||
|
rpc_event,
|
||||||
|
timeout,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
||||||
|
RPCEvent::Error(rpc_event.id(), e.into()),
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// establish outbound substreams
|
||||||
|
if !self.dial_queue.is_empty() {
|
||||||
|
if self.dial_negotiated < self.max_dial_negotiated {
|
||||||
|
self.dial_negotiated += 1;
|
||||||
|
let rpc_event = self.dial_queue.remove(0);
|
||||||
|
if let RPCEvent::Request(id, req) = rpc_event {
|
||||||
|
return Ok(Async::Ready(
|
||||||
|
ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||||
|
protocol: SubstreamProtocol::new(req.clone()),
|
||||||
|
info: RPCEvent::Request(id, req),
|
||||||
|
},
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.dial_queue.shrink_to_fit();
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Given a response back from a peer and the request that sent it, construct a response to send
|
||||||
|
/// back to the user. This allows for some data manipulation of responses given requests.
|
||||||
|
fn build_response(rpc_event: RPCEvent, rpc_response: RPCErrorResponse) -> RPCEvent {
|
||||||
|
let id = rpc_event.id();
|
||||||
|
|
||||||
|
// handle the types of responses
|
||||||
|
match rpc_response {
|
||||||
|
RPCErrorResponse::Success(response) => {
|
||||||
|
match response {
|
||||||
|
// if the response is block roots, tag on the extra request data
|
||||||
|
RPCResponse::BeaconBlockBodies(mut resp) => {
|
||||||
|
if let RPCEvent::Request(_id, RPCRequest::BeaconBlockBodies(bodies_req)) =
|
||||||
|
rpc_event
|
||||||
|
{
|
||||||
|
resp.block_roots = Some(bodies_req.block_roots);
|
||||||
|
}
|
||||||
|
RPCEvent::Response(
|
||||||
|
id,
|
||||||
|
RPCErrorResponse::Success(RPCResponse::BeaconBlockBodies(resp)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
_ => RPCEvent::Response(id, RPCErrorResponse::Success(response)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => RPCEvent::Response(id, rpc_response),
|
||||||
|
}
|
||||||
|
}
|
@ -2,130 +2,54 @@
|
|||||||
|
|
||||||
use ssz::{impl_decode_via_from, impl_encode_via_from};
|
use ssz::{impl_decode_via_from, impl_encode_via_from};
|
||||||
use ssz_derive::{Decode, Encode};
|
use ssz_derive::{Decode, Encode};
|
||||||
use types::{BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot};
|
use types::{BeaconBlockBody, Epoch, Hash256, Slot};
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
/// Available Serenity Libp2p RPC methods
|
|
||||||
pub enum RPCMethod {
|
|
||||||
/// Initialise handshake between connecting peers.
|
|
||||||
Hello,
|
|
||||||
/// Terminate a connection providing a reason.
|
|
||||||
Goodbye,
|
|
||||||
/// Requests a number of beacon block roots.
|
|
||||||
BeaconBlockRoots,
|
|
||||||
/// Requests a number of beacon block headers.
|
|
||||||
BeaconBlockHeaders,
|
|
||||||
/// Requests a number of beacon block bodies.
|
|
||||||
BeaconBlockBodies,
|
|
||||||
/// Requests values for a merkle proof for the current blocks state root.
|
|
||||||
BeaconChainState, // Note: experimental, not complete.
|
|
||||||
/// Unknown method received.
|
|
||||||
Unknown,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<u16> for RPCMethod {
|
|
||||||
fn from(method_id: u16) -> Self {
|
|
||||||
match method_id {
|
|
||||||
0 => RPCMethod::Hello,
|
|
||||||
1 => RPCMethod::Goodbye,
|
|
||||||
10 => RPCMethod::BeaconBlockRoots,
|
|
||||||
11 => RPCMethod::BeaconBlockHeaders,
|
|
||||||
12 => RPCMethod::BeaconBlockBodies,
|
|
||||||
13 => RPCMethod::BeaconChainState,
|
|
||||||
|
|
||||||
_ => RPCMethod::Unknown,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Into<u16> for RPCMethod {
|
|
||||||
fn into(self) -> u16 {
|
|
||||||
match self {
|
|
||||||
RPCMethod::Hello => 0,
|
|
||||||
RPCMethod::Goodbye => 1,
|
|
||||||
RPCMethod::BeaconBlockRoots => 10,
|
|
||||||
RPCMethod::BeaconBlockHeaders => 11,
|
|
||||||
RPCMethod::BeaconBlockBodies => 12,
|
|
||||||
RPCMethod::BeaconChainState => 13,
|
|
||||||
_ => 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub enum RPCRequest {
|
|
||||||
Hello(HelloMessage),
|
|
||||||
Goodbye(GoodbyeReason),
|
|
||||||
BeaconBlockRoots(BeaconBlockRootsRequest),
|
|
||||||
BeaconBlockHeaders(BeaconBlockHeadersRequest),
|
|
||||||
BeaconBlockBodies(BeaconBlockBodiesRequest),
|
|
||||||
BeaconChainState(BeaconChainStateRequest),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RPCRequest {
|
|
||||||
pub fn method_id(&self) -> u16 {
|
|
||||||
let method = match self {
|
|
||||||
RPCRequest::Hello(_) => RPCMethod::Hello,
|
|
||||||
RPCRequest::Goodbye(_) => RPCMethod::Goodbye,
|
|
||||||
RPCRequest::BeaconBlockRoots(_) => RPCMethod::BeaconBlockRoots,
|
|
||||||
RPCRequest::BeaconBlockHeaders(_) => RPCMethod::BeaconBlockHeaders,
|
|
||||||
RPCRequest::BeaconBlockBodies(_) => RPCMethod::BeaconBlockBodies,
|
|
||||||
RPCRequest::BeaconChainState(_) => RPCMethod::BeaconChainState,
|
|
||||||
};
|
|
||||||
method.into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub enum RPCResponse {
|
|
||||||
Hello(HelloMessage),
|
|
||||||
BeaconBlockRoots(BeaconBlockRootsResponse),
|
|
||||||
BeaconBlockHeaders(BeaconBlockHeadersResponse),
|
|
||||||
BeaconBlockBodies(BeaconBlockBodiesResponse),
|
|
||||||
BeaconChainState(BeaconChainStateResponse),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RPCResponse {
|
|
||||||
pub fn method_id(&self) -> u16 {
|
|
||||||
let method = match self {
|
|
||||||
RPCResponse::Hello(_) => RPCMethod::Hello,
|
|
||||||
RPCResponse::BeaconBlockRoots(_) => RPCMethod::BeaconBlockRoots,
|
|
||||||
RPCResponse::BeaconBlockHeaders(_) => RPCMethod::BeaconBlockHeaders,
|
|
||||||
RPCResponse::BeaconBlockBodies(_) => RPCMethod::BeaconBlockBodies,
|
|
||||||
RPCResponse::BeaconChainState(_) => RPCMethod::BeaconChainState,
|
|
||||||
};
|
|
||||||
method.into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Request/Response data structures for RPC methods */
|
/* Request/Response data structures for RPC methods */
|
||||||
|
|
||||||
|
/* Requests */
|
||||||
|
|
||||||
|
pub type RequestId = usize;
|
||||||
|
|
||||||
/// The HELLO request/response handshake message.
|
/// The HELLO request/response handshake message.
|
||||||
#[derive(Encode, Decode, Clone, Debug)]
|
#[derive(Encode, Decode, Clone, Debug)]
|
||||||
pub struct HelloMessage {
|
pub struct HelloMessage {
|
||||||
/// The network ID of the peer.
|
/// The network ID of the peer.
|
||||||
pub network_id: u8,
|
pub network_id: u8,
|
||||||
|
|
||||||
|
/// The chain id for the HELLO request.
|
||||||
|
pub chain_id: u64,
|
||||||
|
|
||||||
/// The peers last finalized root.
|
/// The peers last finalized root.
|
||||||
pub latest_finalized_root: Hash256,
|
pub latest_finalized_root: Hash256,
|
||||||
|
|
||||||
/// The peers last finalized epoch.
|
/// The peers last finalized epoch.
|
||||||
pub latest_finalized_epoch: Epoch,
|
pub latest_finalized_epoch: Epoch,
|
||||||
|
|
||||||
/// The peers last block root.
|
/// The peers last block root.
|
||||||
pub best_root: Hash256,
|
pub best_root: Hash256,
|
||||||
|
|
||||||
/// The peers last slot.
|
/// The peers last slot.
|
||||||
pub best_slot: Slot,
|
pub best_slot: Slot,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The reason given for a `Goodbye` message.
|
/// The reason given for a `Goodbye` message.
|
||||||
///
|
///
|
||||||
/// Note: any unknown `u64::into(n)` will resolve to `GoodbyeReason::Unknown` for any unknown `n`,
|
/// Note: any unknown `u64::into(n)` will resolve to `Goodbye::Unknown` for any unknown `n`,
|
||||||
/// however `GoodbyeReason::Unknown.into()` will go into `0_u64`. Therefore de-serializing then
|
/// however `GoodbyeReason::Unknown.into()` will go into `0_u64`. Therefore de-serializing then
|
||||||
/// re-serializing may not return the same bytes.
|
/// re-serializing may not return the same bytes.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum GoodbyeReason {
|
pub enum GoodbyeReason {
|
||||||
ClientShutdown,
|
/// This node has shutdown.
|
||||||
IrreleventNetwork,
|
ClientShutdown = 1,
|
||||||
Fault,
|
|
||||||
Unknown,
|
/// Incompatible networks.
|
||||||
|
IrreleventNetwork = 2,
|
||||||
|
|
||||||
|
/// Error/fault in the RPC.
|
||||||
|
Fault = 3,
|
||||||
|
|
||||||
|
/// Unknown reason.
|
||||||
|
Unknown = 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<u64> for GoodbyeReason {
|
impl From<u64> for GoodbyeReason {
|
||||||
@ -141,12 +65,7 @@ impl From<u64> for GoodbyeReason {
|
|||||||
|
|
||||||
impl Into<u64> for GoodbyeReason {
|
impl Into<u64> for GoodbyeReason {
|
||||||
fn into(self) -> u64 {
|
fn into(self) -> u64 {
|
||||||
match self {
|
self as u64
|
||||||
GoodbyeReason::Unknown => 0,
|
|
||||||
GoodbyeReason::ClientShutdown => 1,
|
|
||||||
GoodbyeReason::IrreleventNetwork => 2,
|
|
||||||
GoodbyeReason::Fault => 3,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -158,6 +77,7 @@ impl_decode_via_from!(GoodbyeReason, u64);
|
|||||||
pub struct BeaconBlockRootsRequest {
|
pub struct BeaconBlockRootsRequest {
|
||||||
/// The starting slot of the requested blocks.
|
/// The starting slot of the requested blocks.
|
||||||
pub start_slot: Slot,
|
pub start_slot: Slot,
|
||||||
|
|
||||||
/// The number of blocks from the start slot.
|
/// The number of blocks from the start slot.
|
||||||
pub count: u64, // this must be less than 32768. //TODO: Enforce this in the lower layers
|
pub count: u64, // this must be less than 32768. //TODO: Enforce this in the lower layers
|
||||||
}
|
}
|
||||||
@ -169,8 +89,19 @@ pub struct BeaconBlockRootsResponse {
|
|||||||
pub roots: Vec<BlockRootSlot>,
|
pub roots: Vec<BlockRootSlot>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Contains a block root and associated slot.
|
||||||
|
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
|
||||||
|
pub struct BlockRootSlot {
|
||||||
|
/// The block root.
|
||||||
|
pub block_root: Hash256,
|
||||||
|
|
||||||
|
/// The block slot.
|
||||||
|
pub slot: Slot,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The response of a beacon block roots request.
|
||||||
impl BeaconBlockRootsResponse {
|
impl BeaconBlockRootsResponse {
|
||||||
/// Returns `true` if each `self.roots.slot[i]` is higher than the preceeding `i`.
|
/// Returns `true` if each `self.roots.slot[i]` is higher than the preceding `i`.
|
||||||
pub fn slots_are_ascending(&self) -> bool {
|
pub fn slots_are_ascending(&self) -> bool {
|
||||||
for window in self.roots.windows(2) {
|
for window in self.roots.windows(2) {
|
||||||
if window[0].slot >= window[1].slot {
|
if window[0].slot >= window[1].slot {
|
||||||
@ -182,33 +113,27 @@ impl BeaconBlockRootsResponse {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Contains a block root and associated slot.
|
|
||||||
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
|
|
||||||
pub struct BlockRootSlot {
|
|
||||||
/// The block root.
|
|
||||||
pub block_root: Hash256,
|
|
||||||
/// The block slot.
|
|
||||||
pub slot: Slot,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Request a number of beacon block headers from a peer.
|
/// Request a number of beacon block headers from a peer.
|
||||||
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
|
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
|
||||||
pub struct BeaconBlockHeadersRequest {
|
pub struct BeaconBlockHeadersRequest {
|
||||||
/// The starting header hash of the requested headers.
|
/// The starting header hash of the requested headers.
|
||||||
pub start_root: Hash256,
|
pub start_root: Hash256,
|
||||||
|
|
||||||
/// The starting slot of the requested headers.
|
/// The starting slot of the requested headers.
|
||||||
pub start_slot: Slot,
|
pub start_slot: Slot,
|
||||||
|
|
||||||
/// The maximum number of headers than can be returned.
|
/// The maximum number of headers than can be returned.
|
||||||
pub max_headers: u64,
|
pub max_headers: u64,
|
||||||
|
|
||||||
/// The maximum number of slots to skip between blocks.
|
/// The maximum number of slots to skip between blocks.
|
||||||
pub skip_slots: u64,
|
pub skip_slots: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Response containing requested block headers.
|
/// Response containing requested block headers.
|
||||||
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct BeaconBlockHeadersResponse {
|
pub struct BeaconBlockHeadersResponse {
|
||||||
/// The list of requested beacon block headers.
|
/// The list of ssz-encoded requested beacon block headers.
|
||||||
pub headers: Vec<BeaconBlockHeader>,
|
pub headers: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request a number of beacon block bodies from a peer.
|
/// Request a number of beacon block bodies from a peer.
|
||||||
@ -219,9 +144,20 @@ pub struct BeaconBlockBodiesRequest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Response containing the list of requested beacon block bodies.
|
/// Response containing the list of requested beacon block bodies.
|
||||||
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct BeaconBlockBodiesResponse {
|
pub struct BeaconBlockBodiesResponse {
|
||||||
/// The list of beacon block bodies being requested.
|
/// The list of hashes that were sent in the request and match these roots response. None when
|
||||||
|
/// sending outbound.
|
||||||
|
pub block_roots: Option<Vec<Hash256>>,
|
||||||
|
/// The list of ssz-encoded beacon block bodies being requested.
|
||||||
|
pub block_bodies: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The decoded version of `BeaconBlockBodiesResponse` which is expected in `SimpleSync`.
|
||||||
|
pub struct DecodedBeaconBlockBodiesResponse {
|
||||||
|
/// The list of hashes sent in the request to get this response.
|
||||||
|
pub block_roots: Vec<Hash256>,
|
||||||
|
/// The valid decoded block bodies.
|
||||||
pub block_bodies: Vec<BeaconBlockBody>,
|
pub block_bodies: Vec<BeaconBlockBody>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -237,5 +173,71 @@ pub struct BeaconChainStateRequest {
|
|||||||
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
|
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
|
||||||
pub struct BeaconChainStateResponse {
|
pub struct BeaconChainStateResponse {
|
||||||
/// The values corresponding the to the requested tree hashes.
|
/// The values corresponding the to the requested tree hashes.
|
||||||
pub values: bool, //TBD - stubbed with encodeable bool
|
pub values: bool, //TBD - stubbed with encodable bool
|
||||||
|
}
|
||||||
|
|
||||||
|
/* RPC Handling and Grouping */
|
||||||
|
// Collection of enums and structs used by the Codecs to encode/decode RPC messages
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum RPCResponse {
|
||||||
|
/// A HELLO message.
|
||||||
|
Hello(HelloMessage),
|
||||||
|
/// A response to a get BEACON_BLOCK_ROOTS request.
|
||||||
|
BeaconBlockRoots(BeaconBlockRootsResponse),
|
||||||
|
/// A response to a get BEACON_BLOCK_HEADERS request.
|
||||||
|
BeaconBlockHeaders(BeaconBlockHeadersResponse),
|
||||||
|
/// A response to a get BEACON_BLOCK_BODIES request.
|
||||||
|
BeaconBlockBodies(BeaconBlockBodiesResponse),
|
||||||
|
/// A response to a get BEACON_CHAIN_STATE request.
|
||||||
|
BeaconChainState(BeaconChainStateResponse),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum RPCErrorResponse {
|
||||||
|
Success(RPCResponse),
|
||||||
|
InvalidRequest(ErrorMessage),
|
||||||
|
ServerError(ErrorMessage),
|
||||||
|
Unknown(ErrorMessage),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RPCErrorResponse {
|
||||||
|
/// Used to encode the response.
|
||||||
|
pub fn as_u8(&self) -> u8 {
|
||||||
|
match self {
|
||||||
|
RPCErrorResponse::Success(_) => 0,
|
||||||
|
RPCErrorResponse::InvalidRequest(_) => 2,
|
||||||
|
RPCErrorResponse::ServerError(_) => 3,
|
||||||
|
RPCErrorResponse::Unknown(_) => 255,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tells the codec whether to decode as an RPCResponse or an error.
|
||||||
|
pub fn is_response(response_code: u8) -> bool {
|
||||||
|
match response_code {
|
||||||
|
0 => true,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds an RPCErrorResponse from a response code and an ErrorMessage
|
||||||
|
pub fn from_error(response_code: u8, err: ErrorMessage) -> Self {
|
||||||
|
match response_code {
|
||||||
|
2 => RPCErrorResponse::InvalidRequest(err),
|
||||||
|
3 => RPCErrorResponse::ServerError(err),
|
||||||
|
_ => RPCErrorResponse::Unknown(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Encode, Decode, Debug)]
|
||||||
|
pub struct ErrorMessage {
|
||||||
|
/// The UTF-8 encoded Error message string.
|
||||||
|
pub error_message: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ErrorMessage {
|
||||||
|
pub fn as_string(&self) -> String {
|
||||||
|
String::from_utf8(self.error_message.clone()).unwrap_or_else(|_| "".into())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,26 +1,55 @@
|
|||||||
/// RPC Protocol over libp2p.
|
//! The Ethereum 2.0 Wire Protocol
|
||||||
///
|
//!
|
||||||
/// This is purpose built for Ethereum 2.0 serenity and the protocol listens on
|
//! This protocol is a purpose built Ethereum 2.0 libp2p protocol. It's role is to facilitate
|
||||||
/// `/eth/serenity/rpc/1.0.0`
|
//! direct peer-to-peer communication primarily for sending/receiving chain information for
|
||||||
pub mod methods;
|
//! syncing.
|
||||||
mod protocol;
|
|
||||||
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use libp2p::core::protocols_handler::{OneShotHandler, ProtocolsHandler};
|
use handler::RPCHandler;
|
||||||
|
use libp2p::core::protocols_handler::ProtocolsHandler;
|
||||||
use libp2p::core::swarm::{
|
use libp2p::core::swarm::{
|
||||||
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
|
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
|
||||||
};
|
};
|
||||||
use libp2p::{Multiaddr, PeerId};
|
use libp2p::{Multiaddr, PeerId};
|
||||||
pub use methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse};
|
pub use methods::{ErrorMessage, HelloMessage, RPCErrorResponse, RPCResponse, RequestId};
|
||||||
pub use protocol::{RPCEvent, RPCProtocol, RequestId};
|
pub use protocol::{RPCError, RPCProtocol, RPCRequest};
|
||||||
use slog::o;
|
use slog::o;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
/// The network behaviour handles RPC requests/responses as specified in the Eth 2.0 phase 0
|
pub(crate) mod codec;
|
||||||
/// specification.
|
mod handler;
|
||||||
|
pub mod methods;
|
||||||
|
mod protocol;
|
||||||
|
// mod request_response;
|
||||||
|
|
||||||
pub struct Rpc<TSubstream> {
|
/// The return type used in the behaviour and the resultant event from the protocols handler.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum RPCEvent {
|
||||||
|
/// A request that was received from the RPC protocol. The first parameter is a sequential
|
||||||
|
/// id which tracks an awaiting substream for the response.
|
||||||
|
Request(RequestId, RPCRequest),
|
||||||
|
|
||||||
|
/// A response that has been received from the RPC protocol. The first parameter returns
|
||||||
|
/// that which was sent with the corresponding request.
|
||||||
|
Response(RequestId, RPCErrorResponse),
|
||||||
|
/// An Error occurred.
|
||||||
|
Error(RequestId, RPCError),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RPCEvent {
|
||||||
|
pub fn id(&self) -> usize {
|
||||||
|
match *self {
|
||||||
|
RPCEvent::Request(id, _) => id,
|
||||||
|
RPCEvent::Response(id, _) => id,
|
||||||
|
RPCEvent::Error(id, _) => id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
|
||||||
|
/// logic.
|
||||||
|
pub struct RPC<TSubstream> {
|
||||||
/// Queue of events to processed.
|
/// Queue of events to processed.
|
||||||
events: Vec<NetworkBehaviourAction<RPCEvent, RPCMessage>>,
|
events: Vec<NetworkBehaviourAction<RPCEvent, RPCMessage>>,
|
||||||
/// Pins the generic substream.
|
/// Pins the generic substream.
|
||||||
@ -29,17 +58,19 @@ pub struct Rpc<TSubstream> {
|
|||||||
_log: slog::Logger,
|
_log: slog::Logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TSubstream> Rpc<TSubstream> {
|
impl<TSubstream> RPC<TSubstream> {
|
||||||
pub fn new(log: &slog::Logger) -> Self {
|
pub fn new(log: &slog::Logger) -> Self {
|
||||||
let log = log.new(o!("Service" => "Libp2p-RPC"));
|
let log = log.new(o!("Service" => "Libp2p-RPC"));
|
||||||
Rpc {
|
RPC {
|
||||||
events: Vec::new(),
|
events: Vec::new(),
|
||||||
marker: PhantomData,
|
marker: PhantomData,
|
||||||
_log: log,
|
_log: log,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Submits and RPC request.
|
/// Submits an RPC request.
|
||||||
|
///
|
||||||
|
/// The peer must be connected for this to succeed.
|
||||||
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
|
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
|
||||||
self.events.push(NetworkBehaviourAction::SendEvent {
|
self.events.push(NetworkBehaviourAction::SendEvent {
|
||||||
peer_id,
|
peer_id,
|
||||||
@ -48,17 +79,18 @@ impl<TSubstream> Rpc<TSubstream> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TSubstream> NetworkBehaviour for Rpc<TSubstream>
|
impl<TSubstream> NetworkBehaviour for RPC<TSubstream>
|
||||||
where
|
where
|
||||||
TSubstream: AsyncRead + AsyncWrite,
|
TSubstream: AsyncRead + AsyncWrite,
|
||||||
{
|
{
|
||||||
type ProtocolsHandler = OneShotHandler<TSubstream, RPCProtocol, RPCEvent, OneShotEvent>;
|
type ProtocolsHandler = RPCHandler<TSubstream>;
|
||||||
type OutEvent = RPCMessage;
|
type OutEvent = RPCMessage;
|
||||||
|
|
||||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||||
Default::default()
|
Default::default()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handled by discovery
|
||||||
fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
|
fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||||
Vec::new()
|
Vec::new()
|
||||||
}
|
}
|
||||||
@ -72,19 +104,18 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}
|
fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) {
|
||||||
|
// inform the rpc handler that the peer has disconnected
|
||||||
|
self.events.push(NetworkBehaviourAction::GenerateEvent(
|
||||||
|
RPCMessage::PeerDisconnected(peer_id.clone()),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
fn inject_node_event(
|
fn inject_node_event(
|
||||||
&mut self,
|
&mut self,
|
||||||
source: PeerId,
|
source: PeerId,
|
||||||
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
|
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
|
||||||
) {
|
) {
|
||||||
// ignore successful send events
|
|
||||||
let event = match event {
|
|
||||||
OneShotEvent::Rx(event) => event,
|
|
||||||
OneShotEvent::Sent => return,
|
|
||||||
};
|
|
||||||
|
|
||||||
// send the event to the user
|
// send the event to the user
|
||||||
self.events
|
self.events
|
||||||
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage::RPC(
|
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage::RPC(
|
||||||
@ -112,27 +143,5 @@ where
|
|||||||
pub enum RPCMessage {
|
pub enum RPCMessage {
|
||||||
RPC(PeerId, RPCEvent),
|
RPC(PeerId, RPCEvent),
|
||||||
PeerDialed(PeerId),
|
PeerDialed(PeerId),
|
||||||
}
|
PeerDisconnected(PeerId),
|
||||||
|
|
||||||
/// Transmission between the `OneShotHandler` and the `RPCEvent`.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum OneShotEvent {
|
|
||||||
/// We received an RPC from a remote.
|
|
||||||
Rx(RPCEvent),
|
|
||||||
/// We successfully sent an RPC request.
|
|
||||||
Sent,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<RPCEvent> for OneShotEvent {
|
|
||||||
#[inline]
|
|
||||||
fn from(rpc: RPCEvent) -> OneShotEvent {
|
|
||||||
OneShotEvent::Rx(rpc)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<()> for OneShotEvent {
|
|
||||||
#[inline]
|
|
||||||
fn from(_: ()) -> OneShotEvent {
|
|
||||||
OneShotEvent::Sent
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,269 +1,317 @@
|
|||||||
use super::methods::*;
|
use super::methods::*;
|
||||||
|
use crate::rpc::codec::{
|
||||||
|
base::{BaseInboundCodec, BaseOutboundCodec},
|
||||||
|
ssz::{SSZInboundCodec, SSZOutboundCodec},
|
||||||
|
InboundCodec, OutboundCodec,
|
||||||
|
};
|
||||||
|
use futures::{
|
||||||
|
future::{self, FutureResult},
|
||||||
|
sink, stream, Sink, Stream,
|
||||||
|
};
|
||||||
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
|
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
|
||||||
use ssz::{impl_decode_via_from, impl_encode_via_from, ssz_encode, Decode, Encode};
|
|
||||||
use ssz_derive::{Decode, Encode};
|
|
||||||
use std::hash::{Hash, Hasher};
|
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::iter;
|
use std::time::Duration;
|
||||||
|
use tokio::codec::Framed;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
use tokio::prelude::*;
|
||||||
|
use tokio::timer::timeout;
|
||||||
|
use tokio::util::FutureExt;
|
||||||
|
|
||||||
/// The maximum bytes that can be sent across the RPC.
|
/// The maximum bytes that can be sent across the RPC.
|
||||||
const MAX_READ_SIZE: usize = 4_194_304; // 4M
|
const MAX_RPC_SIZE: usize = 4_194_304; // 4M
|
||||||
|
/// The protocol prefix the RPC protocol id.
|
||||||
|
const PROTOCOL_PREFIX: &str = "/eth2/beacon_node/rpc";
|
||||||
|
/// The number of seconds to wait for a request once a protocol has been established before the stream is terminated.
|
||||||
|
const REQUEST_TIMEOUT: u64 = 3;
|
||||||
|
|
||||||
/// Implementation of the `ConnectionUpgrade` for the rpc protocol.
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RPCProtocol;
|
pub struct RPCProtocol;
|
||||||
|
|
||||||
impl UpgradeInfo for RPCProtocol {
|
impl UpgradeInfo for RPCProtocol {
|
||||||
type Info = &'static [u8];
|
type Info = RawProtocolId;
|
||||||
type InfoIter = iter::Once<Self::Info>;
|
type InfoIter = Vec<Self::Info>;
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn protocol_info(&self) -> Self::InfoIter {
|
fn protocol_info(&self) -> Self::InfoIter {
|
||||||
iter::once(b"/eth/serenity/rpc/1.0.0")
|
vec![
|
||||||
|
ProtocolId::new("hello", "1.0.0", "ssz").into(),
|
||||||
|
ProtocolId::new("goodbye", "1.0.0", "ssz").into(),
|
||||||
|
ProtocolId::new("beacon_block_roots", "1.0.0", "ssz").into(),
|
||||||
|
ProtocolId::new("beacon_block_headers", "1.0.0", "ssz").into(),
|
||||||
|
ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz").into(),
|
||||||
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for RPCProtocol {
|
/// The raw protocol id sent over the wire.
|
||||||
fn default() -> Self {
|
type RawProtocolId = Vec<u8>;
|
||||||
RPCProtocol
|
|
||||||
|
/// Tracks the types in a protocol id.
|
||||||
|
pub struct ProtocolId {
|
||||||
|
/// The rpc message type/name.
|
||||||
|
pub message_name: String,
|
||||||
|
|
||||||
|
/// The version of the RPC.
|
||||||
|
pub version: String,
|
||||||
|
|
||||||
|
/// The encoding of the RPC.
|
||||||
|
pub encoding: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An RPC protocol ID.
|
||||||
|
impl ProtocolId {
|
||||||
|
pub fn new(message_name: &str, version: &str, encoding: &str) -> Self {
|
||||||
|
ProtocolId {
|
||||||
|
message_name: message_name.into(),
|
||||||
|
version: version.into(),
|
||||||
|
encoding: encoding.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Converts a raw RPC protocol id string into an `RPCProtocolId`
|
||||||
|
pub fn from_bytes(bytes: &[u8]) -> Result<Self, RPCError> {
|
||||||
|
let protocol_string = String::from_utf8(bytes.to_vec())
|
||||||
|
.map_err(|_| RPCError::InvalidProtocol("Invalid protocol Id"))?;
|
||||||
|
let protocol_list: Vec<&str> = protocol_string.as_str().split('/').take(7).collect();
|
||||||
|
|
||||||
|
if protocol_list.len() != 7 {
|
||||||
|
return Err(RPCError::InvalidProtocol("Not enough '/'"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(ProtocolId {
|
||||||
|
message_name: protocol_list[4].into(),
|
||||||
|
version: protocol_list[5].into(),
|
||||||
|
encoding: protocol_list[6].into(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A monotonic counter for ordering `RPCRequest`s.
|
impl Into<RawProtocolId> for ProtocolId {
|
||||||
#[derive(Debug, Clone, Copy, Default)]
|
fn into(self) -> RawProtocolId {
|
||||||
pub struct RequestId(u64);
|
format!(
|
||||||
|
"{}/{}/{}/{}",
|
||||||
impl RequestId {
|
PROTOCOL_PREFIX, self.message_name, self.version, self.encoding
|
||||||
/// Increment the request id.
|
)
|
||||||
pub fn increment(&mut self) {
|
.as_bytes()
|
||||||
self.0 += 1
|
.to_vec()
|
||||||
}
|
|
||||||
|
|
||||||
/// Return the previous id.
|
|
||||||
pub fn previous(self) -> Self {
|
|
||||||
Self(self.0 - 1)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Eq for RequestId {}
|
/* Inbound upgrade */
|
||||||
|
|
||||||
impl PartialEq for RequestId {
|
// The inbound protocol reads the request, decodes it and returns the stream to the protocol
|
||||||
fn eq(&self, other: &RequestId) -> bool {
|
// handler to respond to once ready.
|
||||||
self.0 == other.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Hash for RequestId {
|
pub type InboundOutput<TSocket> = (RPCRequest, InboundFramed<TSocket>);
|
||||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
pub type InboundFramed<TSocket> = Framed<upgrade::Negotiated<TSocket>, InboundCodec>;
|
||||||
self.0.hash(state);
|
type FnAndThen<TSocket> = fn(
|
||||||
}
|
(Option<RPCRequest>, InboundFramed<TSocket>),
|
||||||
}
|
) -> FutureResult<InboundOutput<TSocket>, RPCError>;
|
||||||
|
type FnMapErr<TSocket> = fn(timeout::Error<(RPCError, InboundFramed<TSocket>)>) -> RPCError;
|
||||||
impl From<u64> for RequestId {
|
|
||||||
fn from(x: u64) -> RequestId {
|
|
||||||
RequestId(x)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Into<u64> for RequestId {
|
|
||||||
fn into(self) -> u64 {
|
|
||||||
self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl_encode_via_from!(RequestId, u64);
|
|
||||||
impl_decode_via_from!(RequestId, u64);
|
|
||||||
|
|
||||||
/// The RPC types which are sent/received in this protocol.
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub enum RPCEvent {
|
|
||||||
Request {
|
|
||||||
id: RequestId,
|
|
||||||
method_id: u16,
|
|
||||||
body: RPCRequest,
|
|
||||||
},
|
|
||||||
Response {
|
|
||||||
id: RequestId,
|
|
||||||
method_id: u16, //TODO: Remove and process decoding upstream
|
|
||||||
result: RPCResponse,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
impl UpgradeInfo for RPCEvent {
|
|
||||||
type Info = &'static [u8];
|
|
||||||
type InfoIter = iter::Once<Self::Info>;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn protocol_info(&self) -> Self::InfoIter {
|
|
||||||
iter::once(b"/eth/serenity/rpc/1.0.0")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type FnDecodeRPCEvent = fn(Vec<u8>, ()) -> Result<RPCEvent, DecodeError>;
|
|
||||||
|
|
||||||
impl<TSocket> InboundUpgrade<TSocket> for RPCProtocol
|
impl<TSocket> InboundUpgrade<TSocket> for RPCProtocol
|
||||||
where
|
where
|
||||||
TSocket: AsyncRead + AsyncWrite,
|
TSocket: AsyncRead + AsyncWrite,
|
||||||
{
|
{
|
||||||
type Output = RPCEvent;
|
type Output = InboundOutput<TSocket>;
|
||||||
type Error = DecodeError;
|
type Error = RPCError;
|
||||||
type Future = upgrade::ReadOneThen<upgrade::Negotiated<TSocket>, (), FnDecodeRPCEvent>;
|
|
||||||
|
|
||||||
fn upgrade_inbound(self, socket: upgrade::Negotiated<TSocket>, _: Self::Info) -> Self::Future {
|
type Future = future::AndThen<
|
||||||
upgrade::read_one_then(socket, MAX_READ_SIZE, (), |packet, ()| Ok(decode(packet)?))
|
future::MapErr<
|
||||||
|
timeout::Timeout<stream::StreamFuture<InboundFramed<TSocket>>>,
|
||||||
|
FnMapErr<TSocket>,
|
||||||
|
>,
|
||||||
|
FutureResult<InboundOutput<TSocket>, RPCError>,
|
||||||
|
FnAndThen<TSocket>,
|
||||||
|
>;
|
||||||
|
|
||||||
|
fn upgrade_inbound(
|
||||||
|
self,
|
||||||
|
socket: upgrade::Negotiated<TSocket>,
|
||||||
|
protocol: RawProtocolId,
|
||||||
|
) -> Self::Future {
|
||||||
|
// TODO: Verify this
|
||||||
|
let protocol_id =
|
||||||
|
ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols");
|
||||||
|
|
||||||
|
match protocol_id.encoding.as_str() {
|
||||||
|
"ssz" | _ => {
|
||||||
|
let ssz_codec =
|
||||||
|
BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, MAX_RPC_SIZE));
|
||||||
|
let codec = InboundCodec::SSZ(ssz_codec);
|
||||||
|
Framed::new(socket, codec)
|
||||||
|
.into_future()
|
||||||
|
.timeout(Duration::from_secs(REQUEST_TIMEOUT))
|
||||||
|
.map_err(RPCError::from as FnMapErr<TSocket>)
|
||||||
|
.and_then({
|
||||||
|
|(req, stream)| match req {
|
||||||
|
Some(req) => futures::future::ok((req, stream)),
|
||||||
|
None => futures::future::err(RPCError::Custom(
|
||||||
|
"Stream terminated early".into(),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
} as FnAndThen<TSocket>)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A helper structed used to obtain SSZ serialization for RPC messages.
|
/* Outbound request */
|
||||||
#[derive(Encode, Decode, Default)]
|
|
||||||
struct SszContainer {
|
// Combines all the RPC requests into a single enum to implement `UpgradeInfo` and
|
||||||
/// Note: the `is_request` field is not included in the spec.
|
// `OutboundUpgrade`
|
||||||
///
|
|
||||||
/// We are unable to determine a request from a response unless we add some flag to the
|
#[derive(Debug, Clone)]
|
||||||
/// packet. Here we have added a bool (encoded as 1 byte) which is set to `1` if the
|
pub enum RPCRequest {
|
||||||
/// message is a request.
|
Hello(HelloMessage),
|
||||||
is_request: bool,
|
Goodbye(GoodbyeReason),
|
||||||
id: u64,
|
BeaconBlockRoots(BeaconBlockRootsRequest),
|
||||||
other: u16,
|
BeaconBlockHeaders(BeaconBlockHeadersRequest),
|
||||||
bytes: Vec<u8>,
|
BeaconBlockBodies(BeaconBlockBodiesRequest),
|
||||||
|
BeaconChainState(BeaconChainStateRequest),
|
||||||
}
|
}
|
||||||
|
|
||||||
fn decode(packet: Vec<u8>) -> Result<RPCEvent, DecodeError> {
|
impl UpgradeInfo for RPCRequest {
|
||||||
let msg = SszContainer::from_ssz_bytes(&packet)?;
|
type Info = RawProtocolId;
|
||||||
|
type InfoIter = Vec<Self::Info>;
|
||||||
|
|
||||||
if msg.is_request {
|
// add further protocols as we support more encodings/versions
|
||||||
let body = match RPCMethod::from(msg.other) {
|
fn protocol_info(&self) -> Self::InfoIter {
|
||||||
RPCMethod::Hello => RPCRequest::Hello(HelloMessage::from_ssz_bytes(&msg.bytes)?),
|
self.supported_protocols()
|
||||||
RPCMethod::Goodbye => RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes(&msg.bytes)?),
|
|
||||||
RPCMethod::BeaconBlockRoots => {
|
|
||||||
RPCRequest::BeaconBlockRoots(BeaconBlockRootsRequest::from_ssz_bytes(&msg.bytes)?)
|
|
||||||
}
|
|
||||||
RPCMethod::BeaconBlockHeaders => RPCRequest::BeaconBlockHeaders(
|
|
||||||
BeaconBlockHeadersRequest::from_ssz_bytes(&msg.bytes)?,
|
|
||||||
),
|
|
||||||
RPCMethod::BeaconBlockBodies => {
|
|
||||||
RPCRequest::BeaconBlockBodies(BeaconBlockBodiesRequest::from_ssz_bytes(&msg.bytes)?)
|
|
||||||
}
|
|
||||||
RPCMethod::BeaconChainState => {
|
|
||||||
RPCRequest::BeaconChainState(BeaconChainStateRequest::from_ssz_bytes(&msg.bytes)?)
|
|
||||||
}
|
|
||||||
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(RPCEvent::Request {
|
|
||||||
id: RequestId::from(msg.id),
|
|
||||||
method_id: msg.other,
|
|
||||||
body,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
// we have received a response
|
|
||||||
else {
|
|
||||||
let result = match RPCMethod::from(msg.other) {
|
|
||||||
RPCMethod::Hello => RPCResponse::Hello(HelloMessage::from_ssz_bytes(&msg.bytes)?),
|
|
||||||
RPCMethod::BeaconBlockRoots => {
|
|
||||||
RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse::from_ssz_bytes(&msg.bytes)?)
|
|
||||||
}
|
|
||||||
RPCMethod::BeaconBlockHeaders => RPCResponse::BeaconBlockHeaders(
|
|
||||||
BeaconBlockHeadersResponse::from_ssz_bytes(&msg.bytes)?,
|
|
||||||
),
|
|
||||||
RPCMethod::BeaconBlockBodies => RPCResponse::BeaconBlockBodies(
|
|
||||||
BeaconBlockBodiesResponse::from_ssz_bytes(&msg.bytes)?,
|
|
||||||
),
|
|
||||||
RPCMethod::BeaconChainState => {
|
|
||||||
RPCResponse::BeaconChainState(BeaconChainStateResponse::from_ssz_bytes(&msg.bytes)?)
|
|
||||||
}
|
|
||||||
// We should never receive a goodbye response; it is invalid.
|
|
||||||
RPCMethod::Goodbye => return Err(DecodeError::UnknownRPCMethod),
|
|
||||||
RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(RPCEvent::Response {
|
|
||||||
id: RequestId::from(msg.id),
|
|
||||||
method_id: msg.other,
|
|
||||||
result,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TSocket> OutboundUpgrade<TSocket> for RPCEvent
|
/// Implements the encoding per supported protocol for RPCRequest.
|
||||||
|
impl RPCRequest {
|
||||||
|
pub fn supported_protocols(&self) -> Vec<RawProtocolId> {
|
||||||
|
match self {
|
||||||
|
// add more protocols when versions/encodings are supported
|
||||||
|
RPCRequest::Hello(_) => vec![ProtocolId::new("hello", "1.0.0", "ssz").into()],
|
||||||
|
RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", "1.0.0", "ssz").into()],
|
||||||
|
RPCRequest::BeaconBlockRoots(_) => {
|
||||||
|
vec![ProtocolId::new("beacon_block_roots", "1.0.0", "ssz").into()]
|
||||||
|
}
|
||||||
|
RPCRequest::BeaconBlockHeaders(_) => {
|
||||||
|
vec![ProtocolId::new("beacon_block_headers", "1.0.0", "ssz").into()]
|
||||||
|
}
|
||||||
|
RPCRequest::BeaconBlockBodies(_) => {
|
||||||
|
vec![ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz").into()]
|
||||||
|
}
|
||||||
|
RPCRequest::BeaconChainState(_) => {
|
||||||
|
vec![ProtocolId::new("beacon_block_state", "1.0.0", "ssz").into()]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This specifies whether a stream should remain open and await a response, given a request.
|
||||||
|
/// A GOODBYE request has no response.
|
||||||
|
pub fn expect_response(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
RPCRequest::Goodbye(_) => false,
|
||||||
|
_ => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* RPC Response type - used for outbound upgrades */
|
||||||
|
|
||||||
|
/* Outbound upgrades */
|
||||||
|
|
||||||
|
pub type OutboundFramed<TSocket> = Framed<upgrade::Negotiated<TSocket>, OutboundCodec>;
|
||||||
|
|
||||||
|
impl<TSocket> OutboundUpgrade<TSocket> for RPCRequest
|
||||||
where
|
where
|
||||||
TSocket: AsyncWrite,
|
TSocket: AsyncRead + AsyncWrite,
|
||||||
{
|
{
|
||||||
type Output = ();
|
type Output = OutboundFramed<TSocket>;
|
||||||
type Error = io::Error;
|
type Error = RPCError;
|
||||||
type Future = upgrade::WriteOne<upgrade::Negotiated<TSocket>>;
|
type Future = sink::Send<OutboundFramed<TSocket>>;
|
||||||
|
fn upgrade_outbound(
|
||||||
|
self,
|
||||||
|
socket: upgrade::Negotiated<TSocket>,
|
||||||
|
protocol: Self::Info,
|
||||||
|
) -> Self::Future {
|
||||||
|
let protocol_id =
|
||||||
|
ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols");
|
||||||
|
|
||||||
#[inline]
|
match protocol_id.encoding.as_str() {
|
||||||
fn upgrade_outbound(self, socket: upgrade::Negotiated<TSocket>, _: Self::Info) -> Self::Future {
|
"ssz" | _ => {
|
||||||
let bytes = ssz_encode(&self);
|
let ssz_codec = BaseOutboundCodec::new(SSZOutboundCodec::new(protocol_id, 4096));
|
||||||
upgrade::write_one(socket, bytes)
|
let codec = OutboundCodec::SSZ(ssz_codec);
|
||||||
}
|
Framed::new(socket, codec).send(self)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
impl Encode for RPCEvent {
|
|
||||||
fn is_ssz_fixed_len() -> bool {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
fn ssz_append(&self, buf: &mut Vec<u8>) {
|
|
||||||
let container = match self {
|
|
||||||
RPCEvent::Request {
|
|
||||||
id,
|
|
||||||
method_id,
|
|
||||||
body,
|
|
||||||
} => SszContainer {
|
|
||||||
is_request: true,
|
|
||||||
id: (*id).into(),
|
|
||||||
other: *method_id,
|
|
||||||
bytes: match body {
|
|
||||||
RPCRequest::Hello(body) => body.as_ssz_bytes(),
|
|
||||||
RPCRequest::Goodbye(body) => body.as_ssz_bytes(),
|
|
||||||
RPCRequest::BeaconBlockRoots(body) => body.as_ssz_bytes(),
|
|
||||||
RPCRequest::BeaconBlockHeaders(body) => body.as_ssz_bytes(),
|
|
||||||
RPCRequest::BeaconBlockBodies(body) => body.as_ssz_bytes(),
|
|
||||||
RPCRequest::BeaconChainState(body) => body.as_ssz_bytes(),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
RPCEvent::Response {
|
|
||||||
id,
|
|
||||||
method_id,
|
|
||||||
result,
|
|
||||||
} => SszContainer {
|
|
||||||
is_request: false,
|
|
||||||
id: (*id).into(),
|
|
||||||
other: *method_id,
|
|
||||||
bytes: match result {
|
|
||||||
RPCResponse::Hello(response) => response.as_ssz_bytes(),
|
|
||||||
RPCResponse::BeaconBlockRoots(response) => response.as_ssz_bytes(),
|
|
||||||
RPCResponse::BeaconBlockHeaders(response) => response.as_ssz_bytes(),
|
|
||||||
RPCResponse::BeaconBlockBodies(response) => response.as_ssz_bytes(),
|
|
||||||
RPCResponse::BeaconChainState(response) => response.as_ssz_bytes(),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
container.ssz_append(buf)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Error in RPC Encoding/Decoding.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum DecodeError {
|
pub enum RPCError {
|
||||||
|
/// Error when reading the packet from the socket.
|
||||||
ReadError(upgrade::ReadOneError),
|
ReadError(upgrade::ReadOneError),
|
||||||
|
/// Error when decoding the raw buffer from ssz.
|
||||||
SSZDecodeError(ssz::DecodeError),
|
SSZDecodeError(ssz::DecodeError),
|
||||||
UnknownRPCMethod,
|
/// Invalid Protocol ID.
|
||||||
|
InvalidProtocol(&'static str),
|
||||||
|
/// IO Error.
|
||||||
|
IoError(io::Error),
|
||||||
|
/// Waiting for a request/response timed out, or timer error'd.
|
||||||
|
StreamTimeout,
|
||||||
|
/// Custom message.
|
||||||
|
Custom(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<upgrade::ReadOneError> for DecodeError {
|
impl From<upgrade::ReadOneError> for RPCError {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from(err: upgrade::ReadOneError) -> Self {
|
fn from(err: upgrade::ReadOneError) -> Self {
|
||||||
DecodeError::ReadError(err)
|
RPCError::ReadError(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<ssz::DecodeError> for DecodeError {
|
impl From<ssz::DecodeError> for RPCError {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from(err: ssz::DecodeError) -> Self {
|
fn from(err: ssz::DecodeError) -> Self {
|
||||||
DecodeError::SSZDecodeError(err)
|
RPCError::SSZDecodeError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T> From<tokio::timer::timeout::Error<T>> for RPCError {
|
||||||
|
fn from(err: tokio::timer::timeout::Error<T>) -> Self {
|
||||||
|
if err.is_elapsed() {
|
||||||
|
RPCError::StreamTimeout
|
||||||
|
} else {
|
||||||
|
RPCError::Custom("Stream timer failed".into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<io::Error> for RPCError {
|
||||||
|
fn from(err: io::Error) -> Self {
|
||||||
|
RPCError::IoError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error trait is required for `ProtocolsHandler`
|
||||||
|
impl std::fmt::Display for RPCError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match *self {
|
||||||
|
RPCError::ReadError(ref err) => write!(f, "Error while reading from socket: {}", err),
|
||||||
|
RPCError::SSZDecodeError(ref err) => write!(f, "Error while decoding ssz: {:?}", err),
|
||||||
|
RPCError::InvalidProtocol(ref err) => write!(f, "Invalid Protocol: {}", err),
|
||||||
|
RPCError::IoError(ref err) => write!(f, "IO Error: {}", err),
|
||||||
|
RPCError::StreamTimeout => write!(f, "Stream Timeout"),
|
||||||
|
RPCError::Custom(ref err) => write!(f, "{}", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for RPCError {
|
||||||
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||||
|
match *self {
|
||||||
|
RPCError::ReadError(ref err) => Some(err),
|
||||||
|
RPCError::SSZDecodeError(_) => None,
|
||||||
|
RPCError::InvalidProtocol(_) => None,
|
||||||
|
RPCError::IoError(ref err) => Some(err),
|
||||||
|
RPCError::StreamTimeout => None,
|
||||||
|
RPCError::Custom(_) => None,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -131,6 +131,9 @@ impl Stream for Service {
|
|||||||
BehaviourEvent::PeerDialed(peer_id) => {
|
BehaviourEvent::PeerDialed(peer_id) => {
|
||||||
return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))));
|
return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))));
|
||||||
}
|
}
|
||||||
|
BehaviourEvent::PeerDisconnected(peer_id) => {
|
||||||
|
return Ok(Async::Ready(Some(Libp2pEvent::PeerDisconnected(peer_id))));
|
||||||
|
}
|
||||||
},
|
},
|
||||||
Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"),
|
Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"),
|
||||||
Ok(Async::NotReady) => break,
|
Ok(Async::NotReady) => break,
|
||||||
@ -180,6 +183,8 @@ pub enum Libp2pEvent {
|
|||||||
RPC(PeerId, RPCEvent),
|
RPC(PeerId, RPCEvent),
|
||||||
/// Initiated the connection to a new peer.
|
/// Initiated the connection to a new peer.
|
||||||
PeerDialed(PeerId),
|
PeerDialed(PeerId),
|
||||||
|
/// A peer has disconnected.
|
||||||
|
PeerDisconnected(PeerId),
|
||||||
/// Received pubsub message.
|
/// Received pubsub message.
|
||||||
PubsubMessage {
|
PubsubMessage {
|
||||||
source: PeerId,
|
source: PeerId,
|
||||||
|
@ -19,3 +19,4 @@ tree_hash = { path = "../../eth2/utils/tree_hash" }
|
|||||||
futures = "0.1.25"
|
futures = "0.1.25"
|
||||||
error-chain = "0.12.0"
|
error-chain = "0.12.0"
|
||||||
tokio = "0.1.16"
|
tokio = "0.1.16"
|
||||||
|
parking_lot = "0.9.0"
|
||||||
|
@ -2,23 +2,19 @@ use crate::error;
|
|||||||
use crate::service::{NetworkMessage, OutgoingMessage};
|
use crate::service::{NetworkMessage, OutgoingMessage};
|
||||||
use crate::sync::SimpleSync;
|
use crate::sync::SimpleSync;
|
||||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||||
|
use eth2_libp2p::rpc::methods::*;
|
||||||
use eth2_libp2p::{
|
use eth2_libp2p::{
|
||||||
behaviour::PubsubMessage,
|
behaviour::PubsubMessage,
|
||||||
rpc::{methods::GoodbyeReason, RPCRequest, RPCResponse, RequestId},
|
rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId},
|
||||||
PeerId, RPCEvent,
|
PeerId, RPCEvent,
|
||||||
};
|
};
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
use slog::{debug, warn};
|
use slog::{debug, warn};
|
||||||
use std::collections::HashMap;
|
use ssz::{Decode, DecodeError};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Instant;
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
use types::BeaconBlockHeader;
|
||||||
/// Timeout for RPC requests.
|
|
||||||
// const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
|
|
||||||
/// Timeout before banning a peer for non-identification.
|
|
||||||
// const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
|
|
||||||
|
|
||||||
/// Handles messages received from the network and client and organises syncing.
|
/// Handles messages received from the network and client and organises syncing.
|
||||||
pub struct MessageHandler<T: BeaconChainTypes> {
|
pub struct MessageHandler<T: BeaconChainTypes> {
|
||||||
@ -33,7 +29,7 @@ pub struct MessageHandler<T: BeaconChainTypes> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Types of messages the handler can receive.
|
/// Types of messages the handler can receive.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug)]
|
||||||
pub enum HandlerMessage {
|
pub enum HandlerMessage {
|
||||||
/// We have initiated a connection to a new peer.
|
/// We have initiated a connection to a new peer.
|
||||||
PeerDialed(PeerId),
|
PeerDialed(PeerId),
|
||||||
@ -88,6 +84,10 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
|||||||
HandlerMessage::PeerDialed(peer_id) => {
|
HandlerMessage::PeerDialed(peer_id) => {
|
||||||
self.sync.on_connect(peer_id, &mut self.network_context);
|
self.sync.on_connect(peer_id, &mut self.network_context);
|
||||||
}
|
}
|
||||||
|
// A peer has disconnected
|
||||||
|
HandlerMessage::PeerDisconnected(peer_id) => {
|
||||||
|
self.sync.on_disconnect(peer_id);
|
||||||
|
}
|
||||||
// we have received an RPC message request/response
|
// we have received an RPC message request/response
|
||||||
HandlerMessage::RPC(peer_id, rpc_event) => {
|
HandlerMessage::RPC(peer_id, rpc_event) => {
|
||||||
self.handle_rpc_message(peer_id, rpc_event);
|
self.handle_rpc_message(peer_id, rpc_event);
|
||||||
@ -96,8 +96,6 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
|||||||
HandlerMessage::PubsubMessage(peer_id, gossip) => {
|
HandlerMessage::PubsubMessage(peer_id, gossip) => {
|
||||||
self.handle_gossip(peer_id, *gossip);
|
self.handle_gossip(peer_id, *gossip);
|
||||||
}
|
}
|
||||||
//TODO: Handle all messages
|
|
||||||
_ => {}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,15 +104,14 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
|||||||
/// Handle RPC messages
|
/// Handle RPC messages
|
||||||
fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) {
|
fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) {
|
||||||
match rpc_message {
|
match rpc_message {
|
||||||
RPCEvent::Request { id, body, .. // TODO: Clean up RPC Message types, have a cleaner type by this point.
|
RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req),
|
||||||
} => self.handle_rpc_request(peer_id, id, body),
|
RPCEvent::Response(_id, resp) => self.handle_rpc_response(peer_id, resp),
|
||||||
RPCEvent::Response { id, result, .. } => self.handle_rpc_response(peer_id, id, result),
|
RPCEvent::Error(id, error) => self.handle_rpc_error(peer_id, id, error),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A new RPC request has been received from the network.
|
/// A new RPC request has been received from the network.
|
||||||
fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) {
|
fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) {
|
||||||
// TODO: process the `id`.
|
|
||||||
match request {
|
match request {
|
||||||
RPCRequest::Hello(hello_message) => self.sync.on_hello_request(
|
RPCRequest::Hello(hello_message) => self.sync.on_hello_request(
|
||||||
peer_id,
|
peer_id,
|
||||||
@ -151,27 +148,27 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
|||||||
|
|
||||||
/// An RPC response has been received from the network.
|
/// An RPC response has been received from the network.
|
||||||
// we match on id and ignore responses past the timeout.
|
// we match on id and ignore responses past the timeout.
|
||||||
fn handle_rpc_response(&mut self, peer_id: PeerId, id: RequestId, response: RPCResponse) {
|
fn handle_rpc_response(&mut self, peer_id: PeerId, error_response: RPCErrorResponse) {
|
||||||
// if response id is not related to a request, ignore (likely RPC timeout)
|
// an error could have occurred.
|
||||||
if self
|
// TODO: Handle Error gracefully
|
||||||
.network_context
|
match error_response {
|
||||||
.outstanding_outgoing_request_ids
|
RPCErrorResponse::InvalidRequest(error) => {
|
||||||
.remove(&(peer_id.clone(), id))
|
warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Invalid Request" => error.as_string())
|
||||||
.is_none()
|
|
||||||
{
|
|
||||||
warn!(
|
|
||||||
self.log,
|
|
||||||
"Unknown ResponseId for incoming RPCRequest";
|
|
||||||
"peer" => format!("{:?}", peer_id),
|
|
||||||
"request_id" => format!("{:?}", id)
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
RPCErrorResponse::ServerError(error) => {
|
||||||
|
warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Server Error" => error.as_string())
|
||||||
|
}
|
||||||
|
RPCErrorResponse::Unknown(error) => {
|
||||||
|
warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Unknown Error" => error.as_string())
|
||||||
|
}
|
||||||
|
RPCErrorResponse::Success(response) => {
|
||||||
match response {
|
match response {
|
||||||
RPCResponse::Hello(hello_message) => {
|
RPCResponse::Hello(hello_message) => {
|
||||||
self.sync
|
self.sync.on_hello_response(
|
||||||
.on_hello_response(peer_id, hello_message, &mut self.network_context);
|
peer_id,
|
||||||
|
hello_message,
|
||||||
|
&mut self.network_context,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
RPCResponse::BeaconBlockRoots(response) => {
|
RPCResponse::BeaconBlockRoots(response) => {
|
||||||
self.sync.on_beacon_block_roots_response(
|
self.sync.on_beacon_block_roots_response(
|
||||||
@ -181,19 +178,33 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
RPCResponse::BeaconBlockHeaders(response) => {
|
RPCResponse::BeaconBlockHeaders(response) => {
|
||||||
|
match self.decode_block_headers(response) {
|
||||||
|
Ok(decoded_block_headers) => {
|
||||||
self.sync.on_beacon_block_headers_response(
|
self.sync.on_beacon_block_headers_response(
|
||||||
peer_id,
|
peer_id,
|
||||||
response,
|
decoded_block_headers,
|
||||||
&mut self.network_context,
|
&mut self.network_context,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
Err(_e) => {
|
||||||
|
warn!(self.log, "Peer sent invalid block headers";"peer" => format!("{:?}", peer_id))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
RPCResponse::BeaconBlockBodies(response) => {
|
RPCResponse::BeaconBlockBodies(response) => {
|
||||||
|
match self.decode_block_bodies(response) {
|
||||||
|
Ok(decoded_block_bodies) => {
|
||||||
self.sync.on_beacon_block_bodies_response(
|
self.sync.on_beacon_block_bodies_response(
|
||||||
peer_id,
|
peer_id,
|
||||||
response,
|
decoded_block_bodies,
|
||||||
&mut self.network_context,
|
&mut self.network_context,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
Err(_e) => {
|
||||||
|
warn!(self.log, "Peer sent invalid block bodies";"peer" => format!("{:?}", peer_id))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
RPCResponse::BeaconChainState(_) => {
|
RPCResponse::BeaconChainState(_) => {
|
||||||
// We do not implement this endpoint, it is not required and will only likely be
|
// We do not implement this endpoint, it is not required and will only likely be
|
||||||
// useful for light-client support in later phases.
|
// useful for light-client support in later phases.
|
||||||
@ -202,7 +213,39 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
|||||||
// beacon state RPC request.
|
// beacon state RPC request.
|
||||||
warn!(self.log, "BeaconChainState RPC call is not supported.");
|
warn!(self.log, "BeaconChainState RPC call is not supported.");
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Verifies and decodes the ssz-encoded block bodies received from peers.
|
||||||
|
fn decode_block_bodies(
|
||||||
|
&self,
|
||||||
|
bodies_response: BeaconBlockBodiesResponse,
|
||||||
|
) -> Result<DecodedBeaconBlockBodiesResponse, DecodeError> {
|
||||||
|
//TODO: Implement faster block verification before decoding entirely
|
||||||
|
let block_bodies = Vec::from_ssz_bytes(&bodies_response.block_bodies)?;
|
||||||
|
Ok(DecodedBeaconBlockBodiesResponse {
|
||||||
|
block_roots: bodies_response
|
||||||
|
.block_roots
|
||||||
|
.expect("Responses must have associated roots"),
|
||||||
|
block_bodies,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Verifies and decodes the ssz-encoded block headers received from peers.
|
||||||
|
fn decode_block_headers(
|
||||||
|
&self,
|
||||||
|
headers_response: BeaconBlockHeadersResponse,
|
||||||
|
) -> Result<Vec<BeaconBlockHeader>, DecodeError> {
|
||||||
|
//TODO: Implement faster header verification before decoding entirely
|
||||||
|
Vec::from_ssz_bytes(&headers_response.headers)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle various RPC errors
|
||||||
|
fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
|
||||||
|
//TODO: Handle error correctly
|
||||||
|
warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle RPC messages
|
/// Handle RPC messages
|
||||||
@ -221,25 +264,17 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: RPC Rewrite makes this struct fairly pointless
|
||||||
pub struct NetworkContext {
|
pub struct NetworkContext {
|
||||||
/// The network channel to relay messages to the Network service.
|
/// The network channel to relay messages to the Network service.
|
||||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||||
/// A mapping of peers and the RPC id we have sent an RPC request to.
|
|
||||||
outstanding_outgoing_request_ids: HashMap<(PeerId, RequestId), Instant>,
|
|
||||||
/// Stores the next `RequestId` we should include on an outgoing `RPCRequest` to a `PeerId`.
|
|
||||||
outgoing_request_ids: HashMap<PeerId, RequestId>,
|
|
||||||
/// The `MessageHandler` logger.
|
/// The `MessageHandler` logger.
|
||||||
log: slog::Logger,
|
log: slog::Logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NetworkContext {
|
impl NetworkContext {
|
||||||
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>, log: slog::Logger) -> Self {
|
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>, log: slog::Logger) -> Self {
|
||||||
Self {
|
Self { network_send, log }
|
||||||
network_send,
|
|
||||||
outstanding_outgoing_request_ids: HashMap::new(),
|
|
||||||
outgoing_request_ids: HashMap::new(),
|
|
||||||
log,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
|
pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
|
||||||
@ -248,21 +283,12 @@ impl NetworkContext {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) {
|
pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) {
|
||||||
let id = self.generate_request_id(&peer_id);
|
// Note: There is currently no use of keeping track of requests. However the functionality
|
||||||
|
// is left here for future revisions.
|
||||||
self.outstanding_outgoing_request_ids
|
self.send_rpc_event(peer_id, RPCEvent::Request(0, rpc_request));
|
||||||
.insert((peer_id.clone(), id), Instant::now());
|
|
||||||
|
|
||||||
self.send_rpc_event(
|
|
||||||
peer_id,
|
|
||||||
RPCEvent::Request {
|
|
||||||
id,
|
|
||||||
method_id: rpc_request.method_id(),
|
|
||||||
body: rpc_request,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//TODO: Handle Error responses
|
||||||
pub fn send_rpc_response(
|
pub fn send_rpc_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
@ -271,11 +297,7 @@ impl NetworkContext {
|
|||||||
) {
|
) {
|
||||||
self.send_rpc_event(
|
self.send_rpc_event(
|
||||||
peer_id,
|
peer_id,
|
||||||
RPCEvent::Response {
|
RPCEvent::Response(request_id, RPCErrorResponse::Success(rpc_response)),
|
||||||
id: request_id,
|
|
||||||
method_id: rpc_response.method_id(),
|
|
||||||
result: rpc_response,
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -292,17 +314,5 @@ impl NetworkContext {
|
|||||||
"Could not send RPC message to the network service"
|
"Could not send RPC message to the network service"
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
//
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the next `RequestId` for sending an `RPCRequest` to the `peer_id`.
|
|
||||||
fn generate_request_id(&mut self, peer_id: &PeerId) -> RequestId {
|
|
||||||
let next_id = self
|
|
||||||
.outgoing_request_ids
|
|
||||||
.entry(peer_id.clone())
|
|
||||||
.and_modify(RequestId::increment)
|
|
||||||
.or_insert_with(|| RequestId::from(1));
|
|
||||||
|
|
||||||
next_id.previous()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ use eth2_libp2p::{Libp2pEvent, PeerId};
|
|||||||
use eth2_libp2p::{PubsubMessage, RPCEvent};
|
use eth2_libp2p::{PubsubMessage, RPCEvent};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
|
use parking_lot::Mutex;
|
||||||
use slog::{debug, info, o, trace};
|
use slog::{debug, info, o, trace};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@ -16,9 +17,9 @@ use tokio::sync::{mpsc, oneshot};
|
|||||||
|
|
||||||
/// Service that handles communication between internal services and the eth2_libp2p network service.
|
/// Service that handles communication between internal services and the eth2_libp2p network service.
|
||||||
pub struct Service<T: BeaconChainTypes> {
|
pub struct Service<T: BeaconChainTypes> {
|
||||||
//libp2p_service: Arc<Mutex<LibP2PService>>,
|
libp2p_service: Arc<Mutex<LibP2PService>>,
|
||||||
_libp2p_exit: oneshot::Sender<()>,
|
_libp2p_exit: oneshot::Sender<()>,
|
||||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
_network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||||
_phantom: PhantomData<T>, //message_handler: MessageHandler,
|
_phantom: PhantomData<T>, //message_handler: MessageHandler,
|
||||||
//message_handler_send: Sender<HandlerMessage>
|
//message_handler_send: Sender<HandlerMessage>
|
||||||
}
|
}
|
||||||
@ -43,38 +44,33 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
|
|||||||
|
|
||||||
// launch libp2p service
|
// launch libp2p service
|
||||||
let libp2p_log = log.new(o!("Service" => "Libp2p"));
|
let libp2p_log = log.new(o!("Service" => "Libp2p"));
|
||||||
let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?;
|
let libp2p_service = Arc::new(Mutex::new(LibP2PService::new(config.clone(), libp2p_log)?));
|
||||||
|
|
||||||
// TODO: Spawn thread to handle libp2p messages and pass to message handler thread.
|
// TODO: Spawn thread to handle libp2p messages and pass to message handler thread.
|
||||||
let libp2p_exit = spawn_service(
|
let libp2p_exit = spawn_service(
|
||||||
libp2p_service,
|
libp2p_service.clone(),
|
||||||
network_recv,
|
network_recv,
|
||||||
message_handler_send,
|
message_handler_send,
|
||||||
executor,
|
executor,
|
||||||
log,
|
log,
|
||||||
)?;
|
)?;
|
||||||
let network_service = Service {
|
let network_service = Service {
|
||||||
|
libp2p_service,
|
||||||
_libp2p_exit: libp2p_exit,
|
_libp2p_exit: libp2p_exit,
|
||||||
network_send: network_send.clone(),
|
_network_send: network_send.clone(),
|
||||||
_phantom: PhantomData,
|
_phantom: PhantomData,
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok((Arc::new(network_service), network_send))
|
Ok((Arc::new(network_service), network_send))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Testing only
|
pub fn libp2p_service(&self) -> Arc<Mutex<LibP2PService>> {
|
||||||
pub fn send_message(&mut self) {
|
self.libp2p_service.clone()
|
||||||
self.network_send
|
|
||||||
.try_send(NetworkMessage::Send(
|
|
||||||
PeerId::random(),
|
|
||||||
OutgoingMessage::NotifierTest,
|
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn spawn_service(
|
fn spawn_service(
|
||||||
libp2p_service: LibP2PService,
|
libp2p_service: Arc<Mutex<LibP2PService>>,
|
||||||
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
|
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
|
||||||
message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
|
message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
|
||||||
executor: &TaskExecutor,
|
executor: &TaskExecutor,
|
||||||
@ -103,7 +99,7 @@ fn spawn_service(
|
|||||||
|
|
||||||
//TODO: Potentially handle channel errors
|
//TODO: Potentially handle channel errors
|
||||||
fn network_service(
|
fn network_service(
|
||||||
mut libp2p_service: LibP2PService,
|
libp2p_service: Arc<Mutex<LibP2PService>>,
|
||||||
mut network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
|
mut network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
|
||||||
mut message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
|
mut message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
|
||||||
log: slog::Logger,
|
log: slog::Logger,
|
||||||
@ -115,28 +111,18 @@ fn network_service(
|
|||||||
not_ready_count = 0;
|
not_ready_count = 0;
|
||||||
// poll the network channel
|
// poll the network channel
|
||||||
match network_recv.poll() {
|
match network_recv.poll() {
|
||||||
Ok(Async::Ready(Some(message))) => {
|
Ok(Async::Ready(Some(message))) => match message {
|
||||||
match message {
|
NetworkMessage::Send(peer_id, outgoing_message) => match outgoing_message {
|
||||||
// TODO: Testing message - remove
|
|
||||||
NetworkMessage::Send(peer_id, outgoing_message) => {
|
|
||||||
match outgoing_message {
|
|
||||||
OutgoingMessage::RPC(rpc_event) => {
|
OutgoingMessage::RPC(rpc_event) => {
|
||||||
trace!(log, "Sending RPC Event: {:?}", rpc_event);
|
trace!(log, "Sending RPC Event: {:?}", rpc_event);
|
||||||
//TODO: Make swarm private
|
libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event);
|
||||||
//TODO: Implement correct peer id topic message handling
|
|
||||||
libp2p_service.swarm.send_rpc(peer_id, rpc_event);
|
|
||||||
}
|
|
||||||
OutgoingMessage::NotifierTest => {
|
|
||||||
// debug!(log, "Received message from notifier");
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
},
|
||||||
NetworkMessage::Publish { topics, message } => {
|
NetworkMessage::Publish { topics, message } => {
|
||||||
debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics));
|
debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics));
|
||||||
libp2p_service.swarm.publish(topics, *message);
|
libp2p_service.lock().swarm.publish(topics, *message);
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
},
|
||||||
Ok(Async::NotReady) => not_ready_count += 1,
|
Ok(Async::NotReady) => not_ready_count += 1,
|
||||||
Ok(Async::Ready(None)) => {
|
Ok(Async::Ready(None)) => {
|
||||||
return Err(eth2_libp2p::error::Error::from("Network channel closed"));
|
return Err(eth2_libp2p::error::Error::from("Network channel closed"));
|
||||||
@ -147,19 +133,25 @@ fn network_service(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// poll the swarm
|
// poll the swarm
|
||||||
match libp2p_service.poll() {
|
match libp2p_service.lock().poll() {
|
||||||
Ok(Async::Ready(Some(event))) => match event {
|
Ok(Async::Ready(Some(event))) => match event {
|
||||||
Libp2pEvent::RPC(peer_id, rpc_event) => {
|
Libp2pEvent::RPC(peer_id, rpc_event) => {
|
||||||
trace!(log, "RPC Event: RPC message received: {:?}", rpc_event);
|
trace!(log, "RPC Event: RPC message received: {:?}", rpc_event);
|
||||||
message_handler_send
|
message_handler_send
|
||||||
.try_send(HandlerMessage::RPC(peer_id, rpc_event))
|
.try_send(HandlerMessage::RPC(peer_id, rpc_event))
|
||||||
.map_err(|_| "failed to send rpc to handler")?;
|
.map_err(|_| "Failed to send RPC to handler")?;
|
||||||
}
|
}
|
||||||
Libp2pEvent::PeerDialed(peer_id) => {
|
Libp2pEvent::PeerDialed(peer_id) => {
|
||||||
debug!(log, "Peer Dialed: {:?}", peer_id);
|
debug!(log, "Peer Dialed: {:?}", peer_id);
|
||||||
message_handler_send
|
message_handler_send
|
||||||
.try_send(HandlerMessage::PeerDialed(peer_id))
|
.try_send(HandlerMessage::PeerDialed(peer_id))
|
||||||
.map_err(|_| "failed to send rpc to handler")?;
|
.map_err(|_| "Failed to send PeerDialed to handler")?;
|
||||||
|
}
|
||||||
|
Libp2pEvent::PeerDisconnected(peer_id) => {
|
||||||
|
debug!(log, "Peer Disconnected: {:?}", peer_id);
|
||||||
|
message_handler_send
|
||||||
|
.try_send(HandlerMessage::PeerDisconnected(peer_id))
|
||||||
|
.map_err(|_| "Failed to send PeerDisconnected to handler")?;
|
||||||
}
|
}
|
||||||
Libp2pEvent::PubsubMessage {
|
Libp2pEvent::PubsubMessage {
|
||||||
source, message, ..
|
source, message, ..
|
||||||
@ -176,12 +168,13 @@ fn network_service(
|
|||||||
Err(_) => not_ready_count += 1,
|
Err(_) => not_ready_count += 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Async::NotReady)
|
Ok(Async::NotReady)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Types of messages that the network service can receive.
|
/// Types of messages that the network service can receive.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug)]
|
||||||
pub enum NetworkMessage {
|
pub enum NetworkMessage {
|
||||||
/// Send a message to libp2p service.
|
/// Send a message to libp2p service.
|
||||||
//TODO: Define typing for messages across the wire
|
//TODO: Define typing for messages across the wire
|
||||||
@ -194,10 +187,8 @@ pub enum NetworkMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Type of outgoing messages that can be sent through the network service.
|
/// Type of outgoing messages that can be sent through the network service.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug)]
|
||||||
pub enum OutgoingMessage {
|
pub enum OutgoingMessage {
|
||||||
/// Send an RPC request/response.
|
/// Send an RPC request/response.
|
||||||
RPC(RPCEvent),
|
RPC(RPCEvent),
|
||||||
//TODO: Remove
|
|
||||||
NotifierTest,
|
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ use eth2_libp2p::rpc::methods::*;
|
|||||||
use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId};
|
use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId};
|
||||||
use eth2_libp2p::PeerId;
|
use eth2_libp2p::PeerId;
|
||||||
use slog::{debug, error, info, o, trace, warn};
|
use slog::{debug, error, info, o, trace, warn};
|
||||||
|
use ssz::Encode;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@ -30,6 +31,7 @@ const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false;
|
|||||||
#[derive(Clone, Copy, Debug)]
|
#[derive(Clone, Copy, Debug)]
|
||||||
pub struct PeerSyncInfo {
|
pub struct PeerSyncInfo {
|
||||||
network_id: u8,
|
network_id: u8,
|
||||||
|
chain_id: u64,
|
||||||
latest_finalized_root: Hash256,
|
latest_finalized_root: Hash256,
|
||||||
latest_finalized_epoch: Epoch,
|
latest_finalized_epoch: Epoch,
|
||||||
best_root: Hash256,
|
best_root: Hash256,
|
||||||
@ -40,6 +42,7 @@ impl From<HelloMessage> for PeerSyncInfo {
|
|||||||
fn from(hello: HelloMessage) -> PeerSyncInfo {
|
fn from(hello: HelloMessage) -> PeerSyncInfo {
|
||||||
PeerSyncInfo {
|
PeerSyncInfo {
|
||||||
network_id: hello.network_id,
|
network_id: hello.network_id,
|
||||||
|
chain_id: hello.chain_id,
|
||||||
latest_finalized_root: hello.latest_finalized_root,
|
latest_finalized_root: hello.latest_finalized_root,
|
||||||
latest_finalized_epoch: hello.latest_finalized_epoch,
|
latest_finalized_epoch: hello.latest_finalized_epoch,
|
||||||
best_root: hello.best_root,
|
best_root: hello.best_root,
|
||||||
@ -106,6 +109,17 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
|||||||
self.known_peers.remove(&peer_id);
|
self.known_peers.remove(&peer_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Handle a peer disconnect.
|
||||||
|
///
|
||||||
|
/// Removes the peer from `known_peers`.
|
||||||
|
pub fn on_disconnect(&mut self, peer_id: PeerId) {
|
||||||
|
info!(
|
||||||
|
self.log, "Peer Disconnected";
|
||||||
|
"peer" => format!("{:?}", peer_id),
|
||||||
|
);
|
||||||
|
self.known_peers.remove(&peer_id);
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle the connection of a new peer.
|
/// Handle the connection of a new peer.
|
||||||
///
|
///
|
||||||
/// Sends a `Hello` message to the peer.
|
/// Sends a `Hello` message to the peer.
|
||||||
@ -407,6 +421,9 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
// ssz-encode the headers
|
||||||
|
let headers = headers.as_ssz_bytes();
|
||||||
|
|
||||||
network.send_rpc_response(
|
network.send_rpc_response(
|
||||||
peer_id,
|
peer_id,
|
||||||
request_id,
|
request_id,
|
||||||
@ -418,17 +435,17 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
|||||||
pub fn on_beacon_block_headers_response(
|
pub fn on_beacon_block_headers_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
res: BeaconBlockHeadersResponse,
|
headers: Vec<BeaconBlockHeader>,
|
||||||
network: &mut NetworkContext,
|
network: &mut NetworkContext,
|
||||||
) {
|
) {
|
||||||
debug!(
|
debug!(
|
||||||
self.log,
|
self.log,
|
||||||
"BlockHeadersResponse";
|
"BlockHeadersResponse";
|
||||||
"peer" => format!("{:?}", peer_id),
|
"peer" => format!("{:?}", peer_id),
|
||||||
"count" => res.headers.len(),
|
"count" => headers.len(),
|
||||||
);
|
);
|
||||||
|
|
||||||
if res.headers.is_empty() {
|
if headers.is_empty() {
|
||||||
warn!(
|
warn!(
|
||||||
self.log,
|
self.log,
|
||||||
"Peer returned empty block headers response. PeerId: {:?}", peer_id
|
"Peer returned empty block headers response. PeerId: {:?}", peer_id
|
||||||
@ -438,9 +455,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
|||||||
|
|
||||||
// Enqueue the headers, obtaining a list of the roots of the headers which were newly added
|
// Enqueue the headers, obtaining a list of the roots of the headers which were newly added
|
||||||
// to the queue.
|
// to the queue.
|
||||||
let block_roots = self
|
let block_roots = self.import_queue.enqueue_headers(headers, peer_id.clone());
|
||||||
.import_queue
|
|
||||||
.enqueue_headers(res.headers, peer_id.clone());
|
|
||||||
|
|
||||||
if !block_roots.is_empty() {
|
if !block_roots.is_empty() {
|
||||||
self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network);
|
self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network);
|
||||||
@ -482,10 +497,15 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
|||||||
"returned" => block_bodies.len(),
|
"returned" => block_bodies.len(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let bytes = block_bodies.as_ssz_bytes();
|
||||||
|
|
||||||
network.send_rpc_response(
|
network.send_rpc_response(
|
||||||
peer_id,
|
peer_id,
|
||||||
request_id,
|
request_id,
|
||||||
RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { block_bodies }),
|
RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse {
|
||||||
|
block_bodies: bytes,
|
||||||
|
block_roots: None,
|
||||||
|
}),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -493,7 +513,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
|||||||
pub fn on_beacon_block_bodies_response(
|
pub fn on_beacon_block_bodies_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
res: BeaconBlockBodiesResponse,
|
res: DecodedBeaconBlockBodiesResponse,
|
||||||
network: &mut NetworkContext,
|
network: &mut NetworkContext,
|
||||||
) {
|
) {
|
||||||
debug!(
|
debug!(
|
||||||
@ -574,6 +594,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
|
|||||||
|
|
||||||
SHOULD_FORWARD_GOSSIP_BLOCK
|
SHOULD_FORWARD_GOSSIP_BLOCK
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockProcessingOutcome::FutureSlot {
|
BlockProcessingOutcome::FutureSlot {
|
||||||
present_slot,
|
present_slot,
|
||||||
block_slot,
|
block_slot,
|
||||||
@ -890,7 +911,9 @@ fn hello_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) -> HelloMes
|
|||||||
let state = &beacon_chain.head().beacon_state;
|
let state = &beacon_chain.head().beacon_state;
|
||||||
|
|
||||||
HelloMessage {
|
HelloMessage {
|
||||||
|
//TODO: Correctly define the chain/network id
|
||||||
network_id: spec.chain_id,
|
network_id: spec.chain_id,
|
||||||
|
chain_id: spec.chain_id as u64,
|
||||||
latest_finalized_root: state.finalized_root,
|
latest_finalized_root: state.finalized_root,
|
||||||
latest_finalized_epoch: state.finalized_epoch,
|
latest_finalized_epoch: state.finalized_epoch,
|
||||||
best_root: beacon_chain.head().beacon_block_root,
|
best_root: beacon_chain.head().beacon_block_root,
|
||||||
|
Loading…
Reference in New Issue
Block a user