2019-07-16 12:32:37 +00:00
|
|
|
use super::methods::{RPCErrorResponse, RPCResponse, RequestId};
|
|
|
|
use super::protocol::{RPCError, RPCProtocol, RPCRequest};
|
2019-07-09 05:44:23 +00:00
|
|
|
use super::RPCEvent;
|
2019-07-16 12:32:37 +00:00
|
|
|
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
|
Update to frozen spec ❄️ (v0.8.1) (#444)
* types: first updates for v0.8
* state_processing: epoch processing v0.8.0
* state_processing: block processing v0.8.0
* tree_hash_derive: support generics in SignedRoot
* types v0.8: update to use ssz_types
* state_processing v0.8: use ssz_types
* ssz_types: add bitwise methods and from_elem
* types: fix v0.8 FIXMEs
* ssz_types: add bitfield shift_up
* ssz_types: iterators and DerefMut for VariableList
* types,state_processing: use VariableList
* ssz_types: fix BitVector Decode impl
Fixed a typo in the implementation of ssz::Decode for BitVector, which caused it
to be considered variable length!
* types: fix test modules for v0.8 update
* types: remove slow type-level arithmetic
* state_processing: fix tests for v0.8
* op_pool: update for v0.8
* ssz_types: Bitfield difference length-independent
Allow computing the difference of two bitfields of different lengths.
* Implement compact committee support
* epoch_processing: committee & active index roots
* state_processing: genesis state builder v0.8
* state_processing: implement v0.8.1
* Further improve tree_hash
* Strip examples, tests from cached_tree_hash
* Update TreeHash, un-impl CachedTreeHash
* Update bitfield TreeHash, un-impl CachedTreeHash
* Update FixedLenVec TreeHash, unimpl CachedTreeHash
* Update update tree_hash_derive for new TreeHash
* Fix TreeHash, un-impl CachedTreeHash for ssz_types
* Remove fixed_len_vec, ssz benches
SSZ benches relied upon fixed_len_vec -- it is easier to just delete
them and rebuild them later (when necessary)
* Remove boolean_bitfield crate
* Fix fake_crypto BLS compile errors
* Update ef_tests for new v.8 type params
* Update ef_tests submodule to v0.8.1 tag
* Make fixes to support parsing ssz ef_tests
* `compact_committee...` to `compact_committees...`
* Derive more traits for `CompactCommittee`
* Flip bitfield byte-endianness
* Fix tree_hash for bitfields
* Modify CLI output for ef_tests
* Bump ssz crate version
* Update ssz_types doc comment
* Del cached tree hash tests from ssz_static tests
* Tidy SSZ dependencies
* Rename ssz_types crate to eth2_ssz_types
* validator_client: update for v0.8
* ssz_types: update union/difference for bit order swap
* beacon_node: update for v0.8, EthSpec
* types: disable cached tree hash, update min spec
* state_processing: fix slot bug in committee update
* tests: temporarily disable fork choice harness test
See #447
* committee cache: prevent out-of-bounds access
In the case where we tried to access the committee of a shard that didn't have a committee in the
current epoch, we were accessing elements beyond the end of the shuffling vector and panicking! This
commit adds a check to make the failure safe and explicit.
* fix bug in get_indexed_attestation and simplify
There was a bug in our implementation of get_indexed_attestation whereby
incorrect "committee indices" were used to index into the custody bitfield. The
bug was only observable in the case where some bits of the custody bitfield were
set to 1. The implementation has been simplified to remove the bug, and a test
added.
* state_proc: workaround for compact committees bug
https://github.com/ethereum/eth2.0-specs/issues/1315
* v0.8: updates to make the EF tests pass
* Remove redundant max operation checks.
* Always supply both messages when checking attestation signatures -- allowing
verification of an attestation with no signatures.
* Swap the order of the fork and domain constant in `get_domain`, to match
the spec.
* rustfmt
* ef_tests: add new epoch processing tests
* Integrate v0.8 into master (compiles)
* Remove unused crates, fix clippy lints
* Replace v0.6.3 tags w/ v0.8.1
* Remove old comment
* Ensure lmd ghost tests only run in release
* Update readme
2019-07-30 02:44:51 +00:00
|
|
|
use core::marker::PhantomData;
|
2019-07-09 05:44:23 +00:00
|
|
|
use fnv::FnvHashMap;
|
|
|
|
use futures::prelude::*;
|
2019-08-10 01:44:17 +00:00
|
|
|
use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade};
|
|
|
|
use libp2p::swarm::protocols_handler::{
|
2019-07-09 05:44:23 +00:00
|
|
|
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
|
2019-07-06 11:32:32 +00:00
|
|
|
};
|
|
|
|
use smallvec::SmallVec;
|
2019-07-09 05:44:23 +00:00
|
|
|
use std::time::{Duration, Instant};
|
2019-07-06 11:32:32 +00:00
|
|
|
use tokio_io::{AsyncRead, AsyncWrite};
|
|
|
|
|
|
|
|
/// The time (in seconds) before a substream that is awaiting a response times out.
|
|
|
|
pub const RESPONSE_TIMEOUT: u64 = 9;
|
|
|
|
|
|
|
|
/// Implementation of `ProtocolsHandler` for the RPC protocol.
|
2019-08-10 01:44:17 +00:00
|
|
|
pub struct RPCHandler<TSubstream>
|
2019-07-16 12:32:37 +00:00
|
|
|
where
|
|
|
|
TSubstream: AsyncRead + AsyncWrite,
|
|
|
|
{
|
2019-07-06 11:32:32 +00:00
|
|
|
/// The upgrade for inbound substreams.
|
|
|
|
listen_protocol: SubstreamProtocol<RPCProtocol>,
|
|
|
|
|
|
|
|
/// If `Some`, something bad happened and we should shut down the handler with an error.
|
2019-07-09 05:44:23 +00:00
|
|
|
pending_error: Option<ProtocolsHandlerUpgrErr<RPCError>>,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Queue of events to produce in `poll()`.
|
2019-07-09 05:44:23 +00:00
|
|
|
events_out: SmallVec<[RPCEvent; 4]>,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Queue of outbound substreams to open.
|
2019-07-16 12:32:37 +00:00
|
|
|
dial_queue: SmallVec<[RPCEvent; 4]>,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Current number of concurrent outbound substreams being opened.
|
|
|
|
dial_negotiated: u32,
|
|
|
|
|
|
|
|
/// Map of current substreams awaiting a response to an RPC request.
|
2019-07-16 12:32:37 +00:00
|
|
|
waiting_substreams: FnvHashMap<RequestId, WaitingResponse<TSubstream>>,
|
|
|
|
|
|
|
|
/// List of outbound substreams that need to be driven to completion.
|
|
|
|
substreams: Vec<SubstreamState<TSubstream>>,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Sequential Id for waiting substreams.
|
2019-07-16 12:32:37 +00:00
|
|
|
current_substream_id: RequestId,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Maximum number of concurrent outbound substreams being opened. Value is never modified.
|
|
|
|
max_dial_negotiated: u32,
|
|
|
|
|
|
|
|
/// Value to return from `connection_keep_alive`.
|
|
|
|
keep_alive: KeepAlive,
|
|
|
|
|
|
|
|
/// After the given duration has elapsed, an inactive connection will shutdown.
|
|
|
|
inactive_timeout: Duration,
|
Update to frozen spec ❄️ (v0.8.1) (#444)
* types: first updates for v0.8
* state_processing: epoch processing v0.8.0
* state_processing: block processing v0.8.0
* tree_hash_derive: support generics in SignedRoot
* types v0.8: update to use ssz_types
* state_processing v0.8: use ssz_types
* ssz_types: add bitwise methods and from_elem
* types: fix v0.8 FIXMEs
* ssz_types: add bitfield shift_up
* ssz_types: iterators and DerefMut for VariableList
* types,state_processing: use VariableList
* ssz_types: fix BitVector Decode impl
Fixed a typo in the implementation of ssz::Decode for BitVector, which caused it
to be considered variable length!
* types: fix test modules for v0.8 update
* types: remove slow type-level arithmetic
* state_processing: fix tests for v0.8
* op_pool: update for v0.8
* ssz_types: Bitfield difference length-independent
Allow computing the difference of two bitfields of different lengths.
* Implement compact committee support
* epoch_processing: committee & active index roots
* state_processing: genesis state builder v0.8
* state_processing: implement v0.8.1
* Further improve tree_hash
* Strip examples, tests from cached_tree_hash
* Update TreeHash, un-impl CachedTreeHash
* Update bitfield TreeHash, un-impl CachedTreeHash
* Update FixedLenVec TreeHash, unimpl CachedTreeHash
* Update update tree_hash_derive for new TreeHash
* Fix TreeHash, un-impl CachedTreeHash for ssz_types
* Remove fixed_len_vec, ssz benches
SSZ benches relied upon fixed_len_vec -- it is easier to just delete
them and rebuild them later (when necessary)
* Remove boolean_bitfield crate
* Fix fake_crypto BLS compile errors
* Update ef_tests for new v.8 type params
* Update ef_tests submodule to v0.8.1 tag
* Make fixes to support parsing ssz ef_tests
* `compact_committee...` to `compact_committees...`
* Derive more traits for `CompactCommittee`
* Flip bitfield byte-endianness
* Fix tree_hash for bitfields
* Modify CLI output for ef_tests
* Bump ssz crate version
* Update ssz_types doc comment
* Del cached tree hash tests from ssz_static tests
* Tidy SSZ dependencies
* Rename ssz_types crate to eth2_ssz_types
* validator_client: update for v0.8
* ssz_types: update union/difference for bit order swap
* beacon_node: update for v0.8, EthSpec
* types: disable cached tree hash, update min spec
* state_processing: fix slot bug in committee update
* tests: temporarily disable fork choice harness test
See #447
* committee cache: prevent out-of-bounds access
In the case where we tried to access the committee of a shard that didn't have a committee in the
current epoch, we were accessing elements beyond the end of the shuffling vector and panicking! This
commit adds a check to make the failure safe and explicit.
* fix bug in get_indexed_attestation and simplify
There was a bug in our implementation of get_indexed_attestation whereby
incorrect "committee indices" were used to index into the custody bitfield. The
bug was only observable in the case where some bits of the custody bitfield were
set to 1. The implementation has been simplified to remove the bug, and a test
added.
* state_proc: workaround for compact committees bug
https://github.com/ethereum/eth2.0-specs/issues/1315
* v0.8: updates to make the EF tests pass
* Remove redundant max operation checks.
* Always supply both messages when checking attestation signatures -- allowing
verification of an attestation with no signatures.
* Swap the order of the fork and domain constant in `get_domain`, to match
the spec.
* rustfmt
* ef_tests: add new epoch processing tests
* Integrate v0.8 into master (compiles)
* Remove unused crates, fix clippy lints
* Replace v0.6.3 tags w/ v0.8.1
* Remove old comment
* Ensure lmd ghost tests only run in release
* Update readme
2019-07-30 02:44:51 +00:00
|
|
|
|
2019-08-10 01:44:17 +00:00
|
|
|
/// Marker to pin the generic stream.
|
|
|
|
_phantom: PhantomData<TSubstream>,
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
2019-07-16 12:32:37 +00:00
|
|
|
/// An outbound substream is waiting a response from the user.
|
|
|
|
struct WaitingResponse<TSubstream> {
|
|
|
|
/// The framed negotiated substream.
|
|
|
|
substream: InboundFramed<TSubstream>,
|
|
|
|
/// The time when the substream is closed.
|
|
|
|
timeout: Instant,
|
|
|
|
}
|
|
|
|
|
2019-07-06 13:43:44 +00:00
|
|
|
/// State of an outbound substream. Either waiting for a response, or in the process of sending.
|
2019-07-16 12:32:37 +00:00
|
|
|
pub enum SubstreamState<TSubstream>
|
|
|
|
where
|
|
|
|
TSubstream: AsyncRead + AsyncWrite,
|
|
|
|
{
|
|
|
|
/// A response has been sent, pending writing and flush.
|
|
|
|
ResponsePendingSend {
|
|
|
|
substream: futures::sink::Send<InboundFramed<TSubstream>>,
|
|
|
|
},
|
|
|
|
/// A request has been sent, and we are awaiting a response. This future is driven in the
|
|
|
|
/// handler because GOODBYE requests can be handled and responses dropped instantly.
|
|
|
|
RequestPendingResponse {
|
|
|
|
/// The framed negotiated substream.
|
|
|
|
substream: OutboundFramed<TSubstream>,
|
|
|
|
/// Keeps track of the request id and the request to permit forming advanced responses which require
|
|
|
|
/// data from the request.
|
|
|
|
rpc_event: RPCEvent,
|
|
|
|
/// The time when the substream is closed.
|
2019-07-09 05:44:23 +00:00
|
|
|
timeout: Instant,
|
|
|
|
},
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
2019-08-10 01:44:17 +00:00
|
|
|
impl<TSubstream> RPCHandler<TSubstream>
|
2019-07-16 12:32:37 +00:00
|
|
|
where
|
|
|
|
TSubstream: AsyncRead + AsyncWrite,
|
|
|
|
{
|
2019-07-06 11:32:32 +00:00
|
|
|
pub fn new(
|
|
|
|
listen_protocol: SubstreamProtocol<RPCProtocol>,
|
2019-07-09 05:44:23 +00:00
|
|
|
inactive_timeout: Duration,
|
2019-07-06 11:32:32 +00:00
|
|
|
) -> Self {
|
|
|
|
RPCHandler {
|
|
|
|
listen_protocol,
|
|
|
|
pending_error: None,
|
|
|
|
events_out: SmallVec::new(),
|
|
|
|
dial_queue: SmallVec::new(),
|
|
|
|
dial_negotiated: 0,
|
|
|
|
waiting_substreams: FnvHashMap::default(),
|
2019-07-16 12:32:37 +00:00
|
|
|
substreams: Vec::new(),
|
|
|
|
current_substream_id: 1,
|
2019-07-06 11:32:32 +00:00
|
|
|
max_dial_negotiated: 8,
|
|
|
|
keep_alive: KeepAlive::Yes,
|
|
|
|
inactive_timeout,
|
Update to frozen spec ❄️ (v0.8.1) (#444)
* types: first updates for v0.8
* state_processing: epoch processing v0.8.0
* state_processing: block processing v0.8.0
* tree_hash_derive: support generics in SignedRoot
* types v0.8: update to use ssz_types
* state_processing v0.8: use ssz_types
* ssz_types: add bitwise methods and from_elem
* types: fix v0.8 FIXMEs
* ssz_types: add bitfield shift_up
* ssz_types: iterators and DerefMut for VariableList
* types,state_processing: use VariableList
* ssz_types: fix BitVector Decode impl
Fixed a typo in the implementation of ssz::Decode for BitVector, which caused it
to be considered variable length!
* types: fix test modules for v0.8 update
* types: remove slow type-level arithmetic
* state_processing: fix tests for v0.8
* op_pool: update for v0.8
* ssz_types: Bitfield difference length-independent
Allow computing the difference of two bitfields of different lengths.
* Implement compact committee support
* epoch_processing: committee & active index roots
* state_processing: genesis state builder v0.8
* state_processing: implement v0.8.1
* Further improve tree_hash
* Strip examples, tests from cached_tree_hash
* Update TreeHash, un-impl CachedTreeHash
* Update bitfield TreeHash, un-impl CachedTreeHash
* Update FixedLenVec TreeHash, unimpl CachedTreeHash
* Update update tree_hash_derive for new TreeHash
* Fix TreeHash, un-impl CachedTreeHash for ssz_types
* Remove fixed_len_vec, ssz benches
SSZ benches relied upon fixed_len_vec -- it is easier to just delete
them and rebuild them later (when necessary)
* Remove boolean_bitfield crate
* Fix fake_crypto BLS compile errors
* Update ef_tests for new v.8 type params
* Update ef_tests submodule to v0.8.1 tag
* Make fixes to support parsing ssz ef_tests
* `compact_committee...` to `compact_committees...`
* Derive more traits for `CompactCommittee`
* Flip bitfield byte-endianness
* Fix tree_hash for bitfields
* Modify CLI output for ef_tests
* Bump ssz crate version
* Update ssz_types doc comment
* Del cached tree hash tests from ssz_static tests
* Tidy SSZ dependencies
* Rename ssz_types crate to eth2_ssz_types
* validator_client: update for v0.8
* ssz_types: update union/difference for bit order swap
* beacon_node: update for v0.8, EthSpec
* types: disable cached tree hash, update min spec
* state_processing: fix slot bug in committee update
* tests: temporarily disable fork choice harness test
See #447
* committee cache: prevent out-of-bounds access
In the case where we tried to access the committee of a shard that didn't have a committee in the
current epoch, we were accessing elements beyond the end of the shuffling vector and panicking! This
commit adds a check to make the failure safe and explicit.
* fix bug in get_indexed_attestation and simplify
There was a bug in our implementation of get_indexed_attestation whereby
incorrect "committee indices" were used to index into the custody bitfield. The
bug was only observable in the case where some bits of the custody bitfield were
set to 1. The implementation has been simplified to remove the bug, and a test
added.
* state_proc: workaround for compact committees bug
https://github.com/ethereum/eth2.0-specs/issues/1315
* v0.8: updates to make the EF tests pass
* Remove redundant max operation checks.
* Always supply both messages when checking attestation signatures -- allowing
verification of an attestation with no signatures.
* Swap the order of the fork and domain constant in `get_domain`, to match
the spec.
* rustfmt
* ef_tests: add new epoch processing tests
* Integrate v0.8 into master (compiles)
* Remove unused crates, fix clippy lints
* Replace v0.6.3 tags w/ v0.8.1
* Remove old comment
* Ensure lmd ghost tests only run in release
* Update readme
2019-07-30 02:44:51 +00:00
|
|
|
_phantom: PhantomData,
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the number of pending requests.
|
|
|
|
pub fn pending_requests(&self) -> u32 {
|
|
|
|
self.dial_negotiated + self.dial_queue.len() as u32
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns a reference to the listen protocol configuration.
|
|
|
|
///
|
|
|
|
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
|
|
|
|
/// > substreams, not the ones already being negotiated.
|
2019-07-09 05:44:23 +00:00
|
|
|
pub fn listen_protocol_ref(&self) -> &SubstreamProtocol<RPCProtocol> {
|
2019-07-06 11:32:32 +00:00
|
|
|
&self.listen_protocol
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns a mutable reference to the listen protocol configuration.
|
|
|
|
///
|
|
|
|
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
|
|
|
|
/// > substreams, not the ones already being negotiated.
|
2019-07-09 05:44:23 +00:00
|
|
|
pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol<RPCProtocol> {
|
2019-07-06 11:32:32 +00:00
|
|
|
&mut self.listen_protocol
|
|
|
|
}
|
|
|
|
|
2019-07-16 12:32:37 +00:00
|
|
|
/// Opens an outbound substream with a request.
|
2019-07-06 11:32:32 +00:00
|
|
|
#[inline]
|
2019-07-16 12:32:37 +00:00
|
|
|
pub fn send_request(&mut self, rpc_event: RPCEvent) {
|
2019-07-06 11:32:32 +00:00
|
|
|
self.keep_alive = KeepAlive::Yes;
|
2019-07-16 12:32:37 +00:00
|
|
|
|
|
|
|
self.dial_queue.push(rpc_event);
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-08-10 01:44:17 +00:00
|
|
|
impl<TSubstream> Default for RPCHandler<TSubstream>
|
2019-07-16 12:32:37 +00:00
|
|
|
where
|
|
|
|
TSubstream: AsyncRead + AsyncWrite,
|
|
|
|
{
|
2019-07-06 11:32:32 +00:00
|
|
|
fn default() -> Self {
|
2019-07-06 13:43:44 +00:00
|
|
|
RPCHandler::new(SubstreamProtocol::new(RPCProtocol), Duration::from_secs(30))
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-08-10 01:44:17 +00:00
|
|
|
impl<TSubstream> ProtocolsHandler for RPCHandler<TSubstream>
|
2019-07-09 05:44:23 +00:00
|
|
|
where
|
|
|
|
TSubstream: AsyncRead + AsyncWrite,
|
2019-07-06 11:32:32 +00:00
|
|
|
{
|
2019-07-06 13:43:44 +00:00
|
|
|
type InEvent = RPCEvent;
|
2019-07-06 11:32:32 +00:00
|
|
|
type OutEvent = RPCEvent;
|
2019-07-09 05:44:23 +00:00
|
|
|
type Error = ProtocolsHandlerUpgrErr<RPCError>;
|
2019-07-06 11:32:32 +00:00
|
|
|
type Substream = TSubstream;
|
|
|
|
type InboundProtocol = RPCProtocol;
|
|
|
|
type OutboundProtocol = RPCRequest;
|
2019-07-16 12:32:37 +00:00
|
|
|
type OutboundOpenInfo = RPCEvent; // Keep track of the id and the request
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
#[inline]
|
|
|
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
|
|
|
self.listen_protocol.clone()
|
|
|
|
}
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
fn inject_fully_negotiated_inbound(
|
|
|
|
&mut self,
|
2019-07-09 05:44:23 +00:00
|
|
|
out: <RPCProtocol as InboundUpgrade<TSubstream>>::Output,
|
2019-07-06 11:32:32 +00:00
|
|
|
) {
|
2019-07-16 12:32:37 +00:00
|
|
|
let (req, substream) = out;
|
2019-07-09 05:44:23 +00:00
|
|
|
// drop the stream and return a 0 id for goodbye "requests"
|
|
|
|
if let r @ RPCRequest::Goodbye(_) = req {
|
|
|
|
self.events_out.push(RPCEvent::Request(0, r));
|
|
|
|
return;
|
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
// New inbound request. Store the stream and tag the output.
|
2019-07-16 12:32:37 +00:00
|
|
|
let awaiting_stream = WaitingResponse {
|
2019-07-09 05:44:23 +00:00
|
|
|
substream,
|
|
|
|
timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT),
|
|
|
|
};
|
|
|
|
self.waiting_substreams
|
|
|
|
.insert(self.current_substream_id, awaiting_stream);
|
|
|
|
|
|
|
|
self.events_out
|
|
|
|
.push(RPCEvent::Request(self.current_substream_id, req));
|
2019-07-06 11:32:32 +00:00
|
|
|
self.current_substream_id += 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
fn inject_fully_negotiated_outbound(
|
|
|
|
&mut self,
|
2019-07-09 05:44:23 +00:00
|
|
|
out: <RPCRequest as OutboundUpgrade<TSubstream>>::Output,
|
2019-07-16 12:32:37 +00:00
|
|
|
rpc_event: Self::OutboundOpenInfo,
|
2019-07-06 11:32:32 +00:00
|
|
|
) {
|
|
|
|
self.dial_negotiated -= 1;
|
|
|
|
|
2019-07-09 05:44:23 +00:00
|
|
|
if self.dial_negotiated == 0
|
|
|
|
&& self.dial_queue.is_empty()
|
|
|
|
&& self.waiting_substreams.is_empty()
|
|
|
|
{
|
2019-07-06 11:32:32 +00:00
|
|
|
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout);
|
2019-07-09 05:44:23 +00:00
|
|
|
} else {
|
2019-07-06 13:43:44 +00:00
|
|
|
self.keep_alive = KeepAlive::Yes;
|
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
|
2019-07-16 12:32:37 +00:00
|
|
|
// add the stream to substreams if we expect a response, otherwise drop the stream.
|
|
|
|
if let RPCEvent::Request(id, req) = rpc_event {
|
|
|
|
if req.expect_response() {
|
|
|
|
let awaiting_stream = SubstreamState::RequestPendingResponse {
|
|
|
|
substream: out,
|
|
|
|
rpc_event: RPCEvent::Request(id, req),
|
|
|
|
timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT),
|
|
|
|
};
|
|
|
|
|
|
|
|
self.substreams.push(awaiting_stream);
|
|
|
|
}
|
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
2019-07-06 13:43:44 +00:00
|
|
|
// Note: If the substream has closed due to inactivity, or the substream is in the
|
|
|
|
// wrong state a response will fail silently.
|
2019-07-06 11:32:32 +00:00
|
|
|
#[inline]
|
2019-07-06 13:43:44 +00:00
|
|
|
fn inject_event(&mut self, rpc_event: Self::InEvent) {
|
|
|
|
match rpc_event {
|
2019-07-16 12:32:37 +00:00
|
|
|
RPCEvent::Request(_, _) => self.send_request(rpc_event),
|
2019-07-06 13:43:44 +00:00
|
|
|
RPCEvent::Response(rpc_id, res) => {
|
|
|
|
// check if the stream matching the response still exists
|
2019-07-16 12:32:37 +00:00
|
|
|
if let Some(waiting_stream) = self.waiting_substreams.remove(&rpc_id) {
|
2019-07-09 05:44:23 +00:00
|
|
|
// only send one response per stream. This must be in the waiting state.
|
2019-07-16 12:32:37 +00:00
|
|
|
self.substreams.push(SubstreamState::ResponsePendingSend {
|
|
|
|
substream: waiting_stream.substream.send(res),
|
|
|
|
});
|
2019-07-06 13:43:44 +00:00
|
|
|
}
|
|
|
|
}
|
2019-07-16 12:32:37 +00:00
|
|
|
RPCEvent::Error(_, _) => {}
|
2019-07-06 13:43:44 +00:00
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
fn inject_dial_upgrade_error(
|
|
|
|
&mut self,
|
|
|
|
_: Self::OutboundOpenInfo,
|
|
|
|
error: ProtocolsHandlerUpgrErr<
|
|
|
|
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error,
|
|
|
|
>,
|
|
|
|
) {
|
|
|
|
if self.pending_error.is_none() {
|
|
|
|
self.pending_error = Some(error);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
fn connection_keep_alive(&self) -> KeepAlive {
|
|
|
|
self.keep_alive
|
|
|
|
}
|
|
|
|
|
|
|
|
fn poll(
|
|
|
|
&mut self,
|
|
|
|
) -> Poll<
|
|
|
|
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
|
|
|
|
Self::Error,
|
|
|
|
> {
|
|
|
|
if let Some(err) = self.pending_error.take() {
|
2019-08-10 01:44:17 +00:00
|
|
|
// Returning an error here will result in dropping any peer that doesn't support any of
|
|
|
|
// the RPC protocols. For our immediate purposes we permit this and simply log that an
|
|
|
|
// upgrade was not supported.
|
|
|
|
// TODO: Add a logger to the handler for trace output.
|
|
|
|
dbg!(&err);
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
2019-07-16 12:32:37 +00:00
|
|
|
// return any events that need to be reported
|
2019-07-06 11:32:32 +00:00
|
|
|
if !self.events_out.is_empty() {
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
self.events_out.remove(0),
|
|
|
|
)));
|
|
|
|
} else {
|
|
|
|
self.events_out.shrink_to_fit();
|
|
|
|
}
|
|
|
|
|
2019-07-16 12:32:37 +00:00
|
|
|
// remove any streams that have expired
|
Update to frozen spec ❄️ (v0.8.1) (#444)
* types: first updates for v0.8
* state_processing: epoch processing v0.8.0
* state_processing: block processing v0.8.0
* tree_hash_derive: support generics in SignedRoot
* types v0.8: update to use ssz_types
* state_processing v0.8: use ssz_types
* ssz_types: add bitwise methods and from_elem
* types: fix v0.8 FIXMEs
* ssz_types: add bitfield shift_up
* ssz_types: iterators and DerefMut for VariableList
* types,state_processing: use VariableList
* ssz_types: fix BitVector Decode impl
Fixed a typo in the implementation of ssz::Decode for BitVector, which caused it
to be considered variable length!
* types: fix test modules for v0.8 update
* types: remove slow type-level arithmetic
* state_processing: fix tests for v0.8
* op_pool: update for v0.8
* ssz_types: Bitfield difference length-independent
Allow computing the difference of two bitfields of different lengths.
* Implement compact committee support
* epoch_processing: committee & active index roots
* state_processing: genesis state builder v0.8
* state_processing: implement v0.8.1
* Further improve tree_hash
* Strip examples, tests from cached_tree_hash
* Update TreeHash, un-impl CachedTreeHash
* Update bitfield TreeHash, un-impl CachedTreeHash
* Update FixedLenVec TreeHash, unimpl CachedTreeHash
* Update update tree_hash_derive for new TreeHash
* Fix TreeHash, un-impl CachedTreeHash for ssz_types
* Remove fixed_len_vec, ssz benches
SSZ benches relied upon fixed_len_vec -- it is easier to just delete
them and rebuild them later (when necessary)
* Remove boolean_bitfield crate
* Fix fake_crypto BLS compile errors
* Update ef_tests for new v.8 type params
* Update ef_tests submodule to v0.8.1 tag
* Make fixes to support parsing ssz ef_tests
* `compact_committee...` to `compact_committees...`
* Derive more traits for `CompactCommittee`
* Flip bitfield byte-endianness
* Fix tree_hash for bitfields
* Modify CLI output for ef_tests
* Bump ssz crate version
* Update ssz_types doc comment
* Del cached tree hash tests from ssz_static tests
* Tidy SSZ dependencies
* Rename ssz_types crate to eth2_ssz_types
* validator_client: update for v0.8
* ssz_types: update union/difference for bit order swap
* beacon_node: update for v0.8, EthSpec
* types: disable cached tree hash, update min spec
* state_processing: fix slot bug in committee update
* tests: temporarily disable fork choice harness test
See #447
* committee cache: prevent out-of-bounds access
In the case where we tried to access the committee of a shard that didn't have a committee in the
current epoch, we were accessing elements beyond the end of the shuffling vector and panicking! This
commit adds a check to make the failure safe and explicit.
* fix bug in get_indexed_attestation and simplify
There was a bug in our implementation of get_indexed_attestation whereby
incorrect "committee indices" were used to index into the custody bitfield. The
bug was only observable in the case where some bits of the custody bitfield were
set to 1. The implementation has been simplified to remove the bug, and a test
added.
* state_proc: workaround for compact committees bug
https://github.com/ethereum/eth2.0-specs/issues/1315
* v0.8: updates to make the EF tests pass
* Remove redundant max operation checks.
* Always supply both messages when checking attestation signatures -- allowing
verification of an attestation with no signatures.
* Swap the order of the fork and domain constant in `get_domain`, to match
the spec.
* rustfmt
* ef_tests: add new epoch processing tests
* Integrate v0.8 into master (compiles)
* Remove unused crates, fix clippy lints
* Replace v0.6.3 tags w/ v0.8.1
* Remove old comment
* Ensure lmd ghost tests only run in release
* Update readme
2019-07-30 02:44:51 +00:00
|
|
|
self.waiting_substreams
|
|
|
|
.retain(|_k, waiting_stream| Instant::now() <= waiting_stream.timeout);
|
2019-07-16 12:32:37 +00:00
|
|
|
|
|
|
|
// drive streams that need to be processed
|
|
|
|
for n in (0..self.substreams.len()).rev() {
|
|
|
|
let stream = self.substreams.swap_remove(n);
|
|
|
|
match stream {
|
|
|
|
SubstreamState::ResponsePendingSend { mut substream } => {
|
|
|
|
match substream.poll() {
|
|
|
|
Ok(Async::Ready(_substream)) => {} // sent and flushed
|
|
|
|
Ok(Async::NotReady) => {
|
|
|
|
self.substreams
|
|
|
|
.push(SubstreamState::ResponsePendingSend { substream });
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
RPCEvent::Error(0, e),
|
|
|
|
)))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
SubstreamState::RequestPendingResponse {
|
|
|
|
mut substream,
|
|
|
|
rpc_event,
|
|
|
|
timeout,
|
|
|
|
} => match substream.poll() {
|
|
|
|
Ok(Async::Ready(response)) => {
|
|
|
|
if let Some(response) = response {
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
build_response(rpc_event, response),
|
|
|
|
)));
|
|
|
|
} else {
|
|
|
|
// stream closed early
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
RPCEvent::Error(
|
|
|
|
rpc_event.id(),
|
|
|
|
RPCError::Custom("Stream Closed Early".into()),
|
|
|
|
),
|
|
|
|
)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(Async::NotReady) => {
|
|
|
|
if Instant::now() < timeout {
|
|
|
|
self.substreams
|
|
|
|
.push(SubstreamState::RequestPendingResponse {
|
|
|
|
substream,
|
|
|
|
rpc_event,
|
|
|
|
timeout,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
Update to frozen spec ❄️ (v0.8.1) (#444)
* types: first updates for v0.8
* state_processing: epoch processing v0.8.0
* state_processing: block processing v0.8.0
* tree_hash_derive: support generics in SignedRoot
* types v0.8: update to use ssz_types
* state_processing v0.8: use ssz_types
* ssz_types: add bitwise methods and from_elem
* types: fix v0.8 FIXMEs
* ssz_types: add bitfield shift_up
* ssz_types: iterators and DerefMut for VariableList
* types,state_processing: use VariableList
* ssz_types: fix BitVector Decode impl
Fixed a typo in the implementation of ssz::Decode for BitVector, which caused it
to be considered variable length!
* types: fix test modules for v0.8 update
* types: remove slow type-level arithmetic
* state_processing: fix tests for v0.8
* op_pool: update for v0.8
* ssz_types: Bitfield difference length-independent
Allow computing the difference of two bitfields of different lengths.
* Implement compact committee support
* epoch_processing: committee & active index roots
* state_processing: genesis state builder v0.8
* state_processing: implement v0.8.1
* Further improve tree_hash
* Strip examples, tests from cached_tree_hash
* Update TreeHash, un-impl CachedTreeHash
* Update bitfield TreeHash, un-impl CachedTreeHash
* Update FixedLenVec TreeHash, unimpl CachedTreeHash
* Update update tree_hash_derive for new TreeHash
* Fix TreeHash, un-impl CachedTreeHash for ssz_types
* Remove fixed_len_vec, ssz benches
SSZ benches relied upon fixed_len_vec -- it is easier to just delete
them and rebuild them later (when necessary)
* Remove boolean_bitfield crate
* Fix fake_crypto BLS compile errors
* Update ef_tests for new v.8 type params
* Update ef_tests submodule to v0.8.1 tag
* Make fixes to support parsing ssz ef_tests
* `compact_committee...` to `compact_committees...`
* Derive more traits for `CompactCommittee`
* Flip bitfield byte-endianness
* Fix tree_hash for bitfields
* Modify CLI output for ef_tests
* Bump ssz crate version
* Update ssz_types doc comment
* Del cached tree hash tests from ssz_static tests
* Tidy SSZ dependencies
* Rename ssz_types crate to eth2_ssz_types
* validator_client: update for v0.8
* ssz_types: update union/difference for bit order swap
* beacon_node: update for v0.8, EthSpec
* types: disable cached tree hash, update min spec
* state_processing: fix slot bug in committee update
* tests: temporarily disable fork choice harness test
See #447
* committee cache: prevent out-of-bounds access
In the case where we tried to access the committee of a shard that didn't have a committee in the
current epoch, we were accessing elements beyond the end of the shuffling vector and panicking! This
commit adds a check to make the failure safe and explicit.
* fix bug in get_indexed_attestation and simplify
There was a bug in our implementation of get_indexed_attestation whereby
incorrect "committee indices" were used to index into the custody bitfield. The
bug was only observable in the case where some bits of the custody bitfield were
set to 1. The implementation has been simplified to remove the bug, and a test
added.
* state_proc: workaround for compact committees bug
https://github.com/ethereum/eth2.0-specs/issues/1315
* v0.8: updates to make the EF tests pass
* Remove redundant max operation checks.
* Always supply both messages when checking attestation signatures -- allowing
verification of an attestation with no signatures.
* Swap the order of the fork and domain constant in `get_domain`, to match
the spec.
* rustfmt
* ef_tests: add new epoch processing tests
* Integrate v0.8 into master (compiles)
* Remove unused crates, fix clippy lints
* Replace v0.6.3 tags w/ v0.8.1
* Remove old comment
* Ensure lmd ghost tests only run in release
* Update readme
2019-07-30 02:44:51 +00:00
|
|
|
RPCEvent::Error(rpc_event.id(), e),
|
2019-07-16 12:32:37 +00:00
|
|
|
)))
|
|
|
|
}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-07-06 13:43:44 +00:00
|
|
|
// establish outbound substreams
|
2019-07-06 11:32:32 +00:00
|
|
|
if !self.dial_queue.is_empty() {
|
|
|
|
if self.dial_negotiated < self.max_dial_negotiated {
|
|
|
|
self.dial_negotiated += 1;
|
2019-07-16 12:32:37 +00:00
|
|
|
let rpc_event = self.dial_queue.remove(0);
|
|
|
|
if let RPCEvent::Request(id, req) = rpc_event {
|
|
|
|
return Ok(Async::Ready(
|
|
|
|
ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
|
|
|
protocol: SubstreamProtocol::new(req.clone()),
|
|
|
|
info: RPCEvent::Request(id, req),
|
|
|
|
},
|
|
|
|
));
|
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
self.dial_queue.shrink_to_fit();
|
|
|
|
}
|
|
|
|
Ok(Async::NotReady)
|
|
|
|
}
|
|
|
|
}
|
2019-07-16 12:32:37 +00:00
|
|
|
|
|
|
|
/// Given a response back from a peer and the request that sent it, construct a response to send
|
|
|
|
/// back to the user. This allows for some data manipulation of responses given requests.
|
|
|
|
fn build_response(rpc_event: RPCEvent, rpc_response: RPCErrorResponse) -> RPCEvent {
|
|
|
|
let id = rpc_event.id();
|
|
|
|
|
|
|
|
// handle the types of responses
|
|
|
|
match rpc_response {
|
|
|
|
RPCErrorResponse::Success(response) => {
|
|
|
|
match response {
|
|
|
|
// if the response is block roots, tag on the extra request data
|
|
|
|
RPCResponse::BeaconBlockBodies(mut resp) => {
|
|
|
|
if let RPCEvent::Request(_id, RPCRequest::BeaconBlockBodies(bodies_req)) =
|
|
|
|
rpc_event
|
|
|
|
{
|
|
|
|
resp.block_roots = Some(bodies_req.block_roots);
|
|
|
|
}
|
|
|
|
RPCEvent::Response(
|
|
|
|
id,
|
|
|
|
RPCErrorResponse::Success(RPCResponse::BeaconBlockBodies(resp)),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
_ => RPCEvent::Response(id, RPCErrorResponse::Success(response)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_ => RPCEvent::Response(id, rpc_response),
|
|
|
|
}
|
|
|
|
}
|