2019-11-29 11:25:36 +00:00
|
|
|
#![allow(clippy::type_complexity)]
|
|
|
|
#![allow(clippy::cognitive_complexity)]
|
|
|
|
|
2019-11-29 02:04:44 +00:00
|
|
|
use super::methods::{RPCErrorResponse, RequestId};
|
2019-07-16 12:32:37 +00:00
|
|
|
use super::protocol::{RPCError, RPCProtocol, RPCRequest};
|
2019-07-09 05:44:23 +00:00
|
|
|
use super::RPCEvent;
|
2019-07-16 12:32:37 +00:00
|
|
|
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
|
Update to frozen spec ❄️ (v0.8.1) (#444)
* types: first updates for v0.8
* state_processing: epoch processing v0.8.0
* state_processing: block processing v0.8.0
* tree_hash_derive: support generics in SignedRoot
* types v0.8: update to use ssz_types
* state_processing v0.8: use ssz_types
* ssz_types: add bitwise methods and from_elem
* types: fix v0.8 FIXMEs
* ssz_types: add bitfield shift_up
* ssz_types: iterators and DerefMut for VariableList
* types,state_processing: use VariableList
* ssz_types: fix BitVector Decode impl
Fixed a typo in the implementation of ssz::Decode for BitVector, which caused it
to be considered variable length!
* types: fix test modules for v0.8 update
* types: remove slow type-level arithmetic
* state_processing: fix tests for v0.8
* op_pool: update for v0.8
* ssz_types: Bitfield difference length-independent
Allow computing the difference of two bitfields of different lengths.
* Implement compact committee support
* epoch_processing: committee & active index roots
* state_processing: genesis state builder v0.8
* state_processing: implement v0.8.1
* Further improve tree_hash
* Strip examples, tests from cached_tree_hash
* Update TreeHash, un-impl CachedTreeHash
* Update bitfield TreeHash, un-impl CachedTreeHash
* Update FixedLenVec TreeHash, unimpl CachedTreeHash
* Update update tree_hash_derive for new TreeHash
* Fix TreeHash, un-impl CachedTreeHash for ssz_types
* Remove fixed_len_vec, ssz benches
SSZ benches relied upon fixed_len_vec -- it is easier to just delete
them and rebuild them later (when necessary)
* Remove boolean_bitfield crate
* Fix fake_crypto BLS compile errors
* Update ef_tests for new v.8 type params
* Update ef_tests submodule to v0.8.1 tag
* Make fixes to support parsing ssz ef_tests
* `compact_committee...` to `compact_committees...`
* Derive more traits for `CompactCommittee`
* Flip bitfield byte-endianness
* Fix tree_hash for bitfields
* Modify CLI output for ef_tests
* Bump ssz crate version
* Update ssz_types doc comment
* Del cached tree hash tests from ssz_static tests
* Tidy SSZ dependencies
* Rename ssz_types crate to eth2_ssz_types
* validator_client: update for v0.8
* ssz_types: update union/difference for bit order swap
* beacon_node: update for v0.8, EthSpec
* types: disable cached tree hash, update min spec
* state_processing: fix slot bug in committee update
* tests: temporarily disable fork choice harness test
See #447
* committee cache: prevent out-of-bounds access
In the case where we tried to access the committee of a shard that didn't have a committee in the
current epoch, we were accessing elements beyond the end of the shuffling vector and panicking! This
commit adds a check to make the failure safe and explicit.
* fix bug in get_indexed_attestation and simplify
There was a bug in our implementation of get_indexed_attestation whereby
incorrect "committee indices" were used to index into the custody bitfield. The
bug was only observable in the case where some bits of the custody bitfield were
set to 1. The implementation has been simplified to remove the bug, and a test
added.
* state_proc: workaround for compact committees bug
https://github.com/ethereum/eth2.0-specs/issues/1315
* v0.8: updates to make the EF tests pass
* Remove redundant max operation checks.
* Always supply both messages when checking attestation signatures -- allowing
verification of an attestation with no signatures.
* Swap the order of the fork and domain constant in `get_domain`, to match
the spec.
* rustfmt
* ef_tests: add new epoch processing tests
* Integrate v0.8 into master (compiles)
* Remove unused crates, fix clippy lints
* Replace v0.6.3 tags w/ v0.8.1
* Remove old comment
* Ensure lmd ghost tests only run in release
* Update readme
2019-07-30 02:44:51 +00:00
|
|
|
use core::marker::PhantomData;
|
2019-07-09 05:44:23 +00:00
|
|
|
use fnv::FnvHashMap;
|
|
|
|
use futures::prelude::*;
|
2019-12-09 07:50:21 +00:00
|
|
|
use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
|
2019-08-10 01:44:17 +00:00
|
|
|
use libp2p::swarm::protocols_handler::{
|
2019-07-09 05:44:23 +00:00
|
|
|
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
|
2019-07-06 11:32:32 +00:00
|
|
|
};
|
2019-12-09 07:50:21 +00:00
|
|
|
use slog::{crit, debug, error, trace};
|
2019-07-06 11:32:32 +00:00
|
|
|
use smallvec::SmallVec;
|
2019-11-27 01:47:46 +00:00
|
|
|
use std::collections::hash_map::Entry;
|
2019-07-09 05:44:23 +00:00
|
|
|
use std::time::{Duration, Instant};
|
2019-11-27 01:47:46 +00:00
|
|
|
use tokio::io::{AsyncRead, AsyncWrite};
|
|
|
|
use tokio::timer::{delay_queue, DelayQueue};
|
|
|
|
|
|
|
|
//TODO: Implement close() on the substream types to improve the poll code.
|
|
|
|
//TODO: Implement check_timeout() on the substream types
|
2019-07-06 11:32:32 +00:00
|
|
|
|
2019-08-29 11:23:28 +00:00
|
|
|
/// The time (in seconds) before a substream that is awaiting a response from the user times out.
|
|
|
|
pub const RESPONSE_TIMEOUT: u64 = 10;
|
2019-07-06 11:32:32 +00:00
|
|
|
|
2019-12-09 07:50:21 +00:00
|
|
|
/// The number of times to retry an outbound upgrade in the case of IO errors.
|
|
|
|
const IO_ERROR_RETRIES: u8 = 3;
|
|
|
|
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Inbound requests are given a sequential `RequestId` to keep track of.
|
|
|
|
type InboundRequestId = RequestId;
|
|
|
|
/// Outbound requests are associated with an id that is given by the application that sent the
|
|
|
|
/// request.
|
|
|
|
type OutboundRequestId = RequestId;
|
|
|
|
|
2019-07-06 11:32:32 +00:00
|
|
|
/// Implementation of `ProtocolsHandler` for the RPC protocol.
|
2019-08-10 01:44:17 +00:00
|
|
|
pub struct RPCHandler<TSubstream>
|
2019-07-16 12:32:37 +00:00
|
|
|
where
|
|
|
|
TSubstream: AsyncRead + AsyncWrite,
|
|
|
|
{
|
2019-07-06 11:32:32 +00:00
|
|
|
/// The upgrade for inbound substreams.
|
|
|
|
listen_protocol: SubstreamProtocol<RPCProtocol>,
|
|
|
|
|
|
|
|
/// If `Some`, something bad happened and we should shut down the handler with an error.
|
2019-12-09 07:50:21 +00:00
|
|
|
pending_error: Option<(RequestId, ProtocolsHandlerUpgrErr<RPCError>)>,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Queue of events to produce in `poll()`.
|
2019-07-09 05:44:23 +00:00
|
|
|
events_out: SmallVec<[RPCEvent; 4]>,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Queue of outbound substreams to open.
|
2019-07-16 12:32:37 +00:00
|
|
|
dial_queue: SmallVec<[RPCEvent; 4]>,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Current number of concurrent outbound substreams being opened.
|
|
|
|
dial_negotiated: u32,
|
|
|
|
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Current inbound substreams awaiting processing.
|
|
|
|
inbound_substreams:
|
|
|
|
FnvHashMap<InboundRequestId, (InboundSubstreamState<TSubstream>, delay_queue::Key)>,
|
|
|
|
|
|
|
|
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
|
|
|
|
inbound_substreams_delay: DelayQueue<InboundRequestId>,
|
|
|
|
|
|
|
|
/// Map of outbound substreams that need to be driven to completion. The `RequestId` is
|
|
|
|
/// maintained by the application sending the request.
|
|
|
|
outbound_substreams:
|
|
|
|
FnvHashMap<OutboundRequestId, (OutboundSubstreamState<TSubstream>, delay_queue::Key)>,
|
2019-07-16 12:32:37 +00:00
|
|
|
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
|
|
|
|
outbound_substreams_delay: DelayQueue<OutboundRequestId>,
|
|
|
|
|
|
|
|
/// Map of outbound items that are queued as the stream processes them.
|
|
|
|
queued_outbound_items: FnvHashMap<RequestId, Vec<RPCErrorResponse>>,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Sequential Id for waiting substreams.
|
2019-07-16 12:32:37 +00:00
|
|
|
current_substream_id: RequestId,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Maximum number of concurrent outbound substreams being opened. Value is never modified.
|
|
|
|
max_dial_negotiated: u32,
|
|
|
|
|
|
|
|
/// Value to return from `connection_keep_alive`.
|
|
|
|
keep_alive: KeepAlive,
|
|
|
|
|
|
|
|
/// After the given duration has elapsed, an inactive connection will shutdown.
|
|
|
|
inactive_timeout: Duration,
|
Update to frozen spec ❄️ (v0.8.1) (#444)
* types: first updates for v0.8
* state_processing: epoch processing v0.8.0
* state_processing: block processing v0.8.0
* tree_hash_derive: support generics in SignedRoot
* types v0.8: update to use ssz_types
* state_processing v0.8: use ssz_types
* ssz_types: add bitwise methods and from_elem
* types: fix v0.8 FIXMEs
* ssz_types: add bitfield shift_up
* ssz_types: iterators and DerefMut for VariableList
* types,state_processing: use VariableList
* ssz_types: fix BitVector Decode impl
Fixed a typo in the implementation of ssz::Decode for BitVector, which caused it
to be considered variable length!
* types: fix test modules for v0.8 update
* types: remove slow type-level arithmetic
* state_processing: fix tests for v0.8
* op_pool: update for v0.8
* ssz_types: Bitfield difference length-independent
Allow computing the difference of two bitfields of different lengths.
* Implement compact committee support
* epoch_processing: committee & active index roots
* state_processing: genesis state builder v0.8
* state_processing: implement v0.8.1
* Further improve tree_hash
* Strip examples, tests from cached_tree_hash
* Update TreeHash, un-impl CachedTreeHash
* Update bitfield TreeHash, un-impl CachedTreeHash
* Update FixedLenVec TreeHash, unimpl CachedTreeHash
* Update update tree_hash_derive for new TreeHash
* Fix TreeHash, un-impl CachedTreeHash for ssz_types
* Remove fixed_len_vec, ssz benches
SSZ benches relied upon fixed_len_vec -- it is easier to just delete
them and rebuild them later (when necessary)
* Remove boolean_bitfield crate
* Fix fake_crypto BLS compile errors
* Update ef_tests for new v.8 type params
* Update ef_tests submodule to v0.8.1 tag
* Make fixes to support parsing ssz ef_tests
* `compact_committee...` to `compact_committees...`
* Derive more traits for `CompactCommittee`
* Flip bitfield byte-endianness
* Fix tree_hash for bitfields
* Modify CLI output for ef_tests
* Bump ssz crate version
* Update ssz_types doc comment
* Del cached tree hash tests from ssz_static tests
* Tidy SSZ dependencies
* Rename ssz_types crate to eth2_ssz_types
* validator_client: update for v0.8
* ssz_types: update union/difference for bit order swap
* beacon_node: update for v0.8, EthSpec
* types: disable cached tree hash, update min spec
* state_processing: fix slot bug in committee update
* tests: temporarily disable fork choice harness test
See #447
* committee cache: prevent out-of-bounds access
In the case where we tried to access the committee of a shard that didn't have a committee in the
current epoch, we were accessing elements beyond the end of the shuffling vector and panicking! This
commit adds a check to make the failure safe and explicit.
* fix bug in get_indexed_attestation and simplify
There was a bug in our implementation of get_indexed_attestation whereby
incorrect "committee indices" were used to index into the custody bitfield. The
bug was only observable in the case where some bits of the custody bitfield were
set to 1. The implementation has been simplified to remove the bug, and a test
added.
* state_proc: workaround for compact committees bug
https://github.com/ethereum/eth2.0-specs/issues/1315
* v0.8: updates to make the EF tests pass
* Remove redundant max operation checks.
* Always supply both messages when checking attestation signatures -- allowing
verification of an attestation with no signatures.
* Swap the order of the fork and domain constant in `get_domain`, to match
the spec.
* rustfmt
* ef_tests: add new epoch processing tests
* Integrate v0.8 into master (compiles)
* Remove unused crates, fix clippy lints
* Replace v0.6.3 tags w/ v0.8.1
* Remove old comment
* Ensure lmd ghost tests only run in release
* Update readme
2019-07-30 02:44:51 +00:00
|
|
|
|
2019-12-09 07:50:21 +00:00
|
|
|
/// Try to negotiate the outbound upgrade a few times if there is an IO error before reporting the request as failed.
|
|
|
|
/// This keeps track of the number of attempts.
|
|
|
|
outbound_io_error_retries: u8,
|
|
|
|
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Logger for handling RPC streams
|
|
|
|
log: slog::Logger,
|
|
|
|
|
2019-08-10 01:44:17 +00:00
|
|
|
/// Marker to pin the generic stream.
|
|
|
|
_phantom: PhantomData<TSubstream>,
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
2019-07-06 13:43:44 +00:00
|
|
|
/// State of an outbound substream. Either waiting for a response, or in the process of sending.
|
2019-11-27 01:47:46 +00:00
|
|
|
pub enum InboundSubstreamState<TSubstream>
|
2019-07-16 12:32:37 +00:00
|
|
|
where
|
|
|
|
TSubstream: AsyncRead + AsyncWrite,
|
|
|
|
{
|
|
|
|
/// A response has been sent, pending writing and flush.
|
|
|
|
ResponsePendingSend {
|
2019-11-27 01:47:46 +00:00
|
|
|
/// The substream used to send the response
|
2019-07-16 12:32:37 +00:00
|
|
|
substream: futures::sink::Send<InboundFramed<TSubstream>>,
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Whether a stream termination is requested. If true the stream will be closed after
|
|
|
|
/// this send. Otherwise it will transition to an idle state until a stream termination is
|
|
|
|
/// requested or a timeout is reached.
|
|
|
|
closing: bool,
|
2019-07-16 12:32:37 +00:00
|
|
|
},
|
2019-11-27 01:47:46 +00:00
|
|
|
/// The response stream is idle and awaiting input from the application to send more chunked
|
|
|
|
/// responses.
|
|
|
|
ResponseIdle(InboundFramed<TSubstream>),
|
|
|
|
/// The substream is attempting to shutdown.
|
|
|
|
Closing(InboundFramed<TSubstream>),
|
|
|
|
/// Temporary state during processing
|
|
|
|
Poisoned,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub enum OutboundSubstreamState<TSubstream> {
|
2019-07-16 12:32:37 +00:00
|
|
|
/// A request has been sent, and we are awaiting a response. This future is driven in the
|
|
|
|
/// handler because GOODBYE requests can be handled and responses dropped instantly.
|
|
|
|
RequestPendingResponse {
|
|
|
|
/// The framed negotiated substream.
|
|
|
|
substream: OutboundFramed<TSubstream>,
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Keeps track of the actual request sent.
|
|
|
|
request: RPCRequest,
|
2019-07-09 05:44:23 +00:00
|
|
|
},
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Closing an outbound substream>
|
|
|
|
Closing(OutboundFramed<TSubstream>),
|
|
|
|
/// Temporary state during processing
|
|
|
|
Poisoned,
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
2019-08-10 01:44:17 +00:00
|
|
|
impl<TSubstream> RPCHandler<TSubstream>
|
2019-07-16 12:32:37 +00:00
|
|
|
where
|
|
|
|
TSubstream: AsyncRead + AsyncWrite,
|
|
|
|
{
|
2019-07-06 11:32:32 +00:00
|
|
|
pub fn new(
|
|
|
|
listen_protocol: SubstreamProtocol<RPCProtocol>,
|
2019-07-09 05:44:23 +00:00
|
|
|
inactive_timeout: Duration,
|
2019-11-27 01:47:46 +00:00
|
|
|
log: &slog::Logger,
|
2019-07-06 11:32:32 +00:00
|
|
|
) -> Self {
|
|
|
|
RPCHandler {
|
|
|
|
listen_protocol,
|
|
|
|
pending_error: None,
|
|
|
|
events_out: SmallVec::new(),
|
|
|
|
dial_queue: SmallVec::new(),
|
|
|
|
dial_negotiated: 0,
|
2019-11-27 01:47:46 +00:00
|
|
|
queued_outbound_items: FnvHashMap::default(),
|
|
|
|
inbound_substreams: FnvHashMap::default(),
|
|
|
|
outbound_substreams: FnvHashMap::default(),
|
|
|
|
inbound_substreams_delay: DelayQueue::new(),
|
|
|
|
outbound_substreams_delay: DelayQueue::new(),
|
2019-07-16 12:32:37 +00:00
|
|
|
current_substream_id: 1,
|
2019-07-06 11:32:32 +00:00
|
|
|
max_dial_negotiated: 8,
|
|
|
|
keep_alive: KeepAlive::Yes,
|
|
|
|
inactive_timeout,
|
2019-12-09 07:50:21 +00:00
|
|
|
outbound_io_error_retries: 0,
|
2019-11-27 01:47:46 +00:00
|
|
|
log: log.clone(),
|
Update to frozen spec ❄️ (v0.8.1) (#444)
* types: first updates for v0.8
* state_processing: epoch processing v0.8.0
* state_processing: block processing v0.8.0
* tree_hash_derive: support generics in SignedRoot
* types v0.8: update to use ssz_types
* state_processing v0.8: use ssz_types
* ssz_types: add bitwise methods and from_elem
* types: fix v0.8 FIXMEs
* ssz_types: add bitfield shift_up
* ssz_types: iterators and DerefMut for VariableList
* types,state_processing: use VariableList
* ssz_types: fix BitVector Decode impl
Fixed a typo in the implementation of ssz::Decode for BitVector, which caused it
to be considered variable length!
* types: fix test modules for v0.8 update
* types: remove slow type-level arithmetic
* state_processing: fix tests for v0.8
* op_pool: update for v0.8
* ssz_types: Bitfield difference length-independent
Allow computing the difference of two bitfields of different lengths.
* Implement compact committee support
* epoch_processing: committee & active index roots
* state_processing: genesis state builder v0.8
* state_processing: implement v0.8.1
* Further improve tree_hash
* Strip examples, tests from cached_tree_hash
* Update TreeHash, un-impl CachedTreeHash
* Update bitfield TreeHash, un-impl CachedTreeHash
* Update FixedLenVec TreeHash, unimpl CachedTreeHash
* Update update tree_hash_derive for new TreeHash
* Fix TreeHash, un-impl CachedTreeHash for ssz_types
* Remove fixed_len_vec, ssz benches
SSZ benches relied upon fixed_len_vec -- it is easier to just delete
them and rebuild them later (when necessary)
* Remove boolean_bitfield crate
* Fix fake_crypto BLS compile errors
* Update ef_tests for new v.8 type params
* Update ef_tests submodule to v0.8.1 tag
* Make fixes to support parsing ssz ef_tests
* `compact_committee...` to `compact_committees...`
* Derive more traits for `CompactCommittee`
* Flip bitfield byte-endianness
* Fix tree_hash for bitfields
* Modify CLI output for ef_tests
* Bump ssz crate version
* Update ssz_types doc comment
* Del cached tree hash tests from ssz_static tests
* Tidy SSZ dependencies
* Rename ssz_types crate to eth2_ssz_types
* validator_client: update for v0.8
* ssz_types: update union/difference for bit order swap
* beacon_node: update for v0.8, EthSpec
* types: disable cached tree hash, update min spec
* state_processing: fix slot bug in committee update
* tests: temporarily disable fork choice harness test
See #447
* committee cache: prevent out-of-bounds access
In the case where we tried to access the committee of a shard that didn't have a committee in the
current epoch, we were accessing elements beyond the end of the shuffling vector and panicking! This
commit adds a check to make the failure safe and explicit.
* fix bug in get_indexed_attestation and simplify
There was a bug in our implementation of get_indexed_attestation whereby
incorrect "committee indices" were used to index into the custody bitfield. The
bug was only observable in the case where some bits of the custody bitfield were
set to 1. The implementation has been simplified to remove the bug, and a test
added.
* state_proc: workaround for compact committees bug
https://github.com/ethereum/eth2.0-specs/issues/1315
* v0.8: updates to make the EF tests pass
* Remove redundant max operation checks.
* Always supply both messages when checking attestation signatures -- allowing
verification of an attestation with no signatures.
* Swap the order of the fork and domain constant in `get_domain`, to match
the spec.
* rustfmt
* ef_tests: add new epoch processing tests
* Integrate v0.8 into master (compiles)
* Remove unused crates, fix clippy lints
* Replace v0.6.3 tags w/ v0.8.1
* Remove old comment
* Ensure lmd ghost tests only run in release
* Update readme
2019-07-30 02:44:51 +00:00
|
|
|
_phantom: PhantomData,
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the number of pending requests.
|
|
|
|
pub fn pending_requests(&self) -> u32 {
|
|
|
|
self.dial_negotiated + self.dial_queue.len() as u32
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns a reference to the listen protocol configuration.
|
|
|
|
///
|
|
|
|
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
|
|
|
|
/// > substreams, not the ones already being negotiated.
|
2019-07-09 05:44:23 +00:00
|
|
|
pub fn listen_protocol_ref(&self) -> &SubstreamProtocol<RPCProtocol> {
|
2019-07-06 11:32:32 +00:00
|
|
|
&self.listen_protocol
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns a mutable reference to the listen protocol configuration.
|
|
|
|
///
|
|
|
|
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
|
|
|
|
/// > substreams, not the ones already being negotiated.
|
2019-07-09 05:44:23 +00:00
|
|
|
pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol<RPCProtocol> {
|
2019-07-06 11:32:32 +00:00
|
|
|
&mut self.listen_protocol
|
|
|
|
}
|
|
|
|
|
2019-07-16 12:32:37 +00:00
|
|
|
/// Opens an outbound substream with a request.
|
|
|
|
pub fn send_request(&mut self, rpc_event: RPCEvent) {
|
2019-07-06 11:32:32 +00:00
|
|
|
self.keep_alive = KeepAlive::Yes;
|
2019-07-16 12:32:37 +00:00
|
|
|
|
|
|
|
self.dial_queue.push(rpc_event);
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-08-10 01:44:17 +00:00
|
|
|
impl<TSubstream> ProtocolsHandler for RPCHandler<TSubstream>
|
2019-07-09 05:44:23 +00:00
|
|
|
where
|
|
|
|
TSubstream: AsyncRead + AsyncWrite,
|
2019-07-06 11:32:32 +00:00
|
|
|
{
|
2019-07-06 13:43:44 +00:00
|
|
|
type InEvent = RPCEvent;
|
2019-07-06 11:32:32 +00:00
|
|
|
type OutEvent = RPCEvent;
|
2019-07-09 05:44:23 +00:00
|
|
|
type Error = ProtocolsHandlerUpgrErr<RPCError>;
|
2019-07-06 11:32:32 +00:00
|
|
|
type Substream = TSubstream;
|
|
|
|
type InboundProtocol = RPCProtocol;
|
|
|
|
type OutboundProtocol = RPCRequest;
|
2019-07-16 12:32:37 +00:00
|
|
|
type OutboundOpenInfo = RPCEvent; // Keep track of the id and the request
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
|
|
|
self.listen_protocol.clone()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn inject_fully_negotiated_inbound(
|
|
|
|
&mut self,
|
2019-07-09 05:44:23 +00:00
|
|
|
out: <RPCProtocol as InboundUpgrade<TSubstream>>::Output,
|
2019-07-06 11:32:32 +00:00
|
|
|
) {
|
2019-11-30 06:49:45 +00:00
|
|
|
// update the keep alive timeout if there are no more remaining outbound streams
|
|
|
|
if let KeepAlive::Until(_) = self.keep_alive {
|
|
|
|
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout);
|
|
|
|
}
|
|
|
|
|
2019-07-16 12:32:37 +00:00
|
|
|
let (req, substream) = out;
|
2019-07-09 05:44:23 +00:00
|
|
|
// drop the stream and return a 0 id for goodbye "requests"
|
|
|
|
if let r @ RPCRequest::Goodbye(_) = req {
|
|
|
|
self.events_out.push(RPCEvent::Request(0, r));
|
|
|
|
return;
|
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
// New inbound request. Store the stream and tag the output.
|
2019-11-27 01:47:46 +00:00
|
|
|
let delay_key = self.inbound_substreams_delay.insert(
|
|
|
|
self.current_substream_id,
|
|
|
|
Duration::from_secs(RESPONSE_TIMEOUT),
|
|
|
|
);
|
|
|
|
let awaiting_stream = InboundSubstreamState::ResponseIdle(substream);
|
|
|
|
self.inbound_substreams
|
|
|
|
.insert(self.current_substream_id, (awaiting_stream, delay_key));
|
2019-07-09 05:44:23 +00:00
|
|
|
|
|
|
|
self.events_out
|
|
|
|
.push(RPCEvent::Request(self.current_substream_id, req));
|
2019-07-06 11:32:32 +00:00
|
|
|
self.current_substream_id += 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
fn inject_fully_negotiated_outbound(
|
|
|
|
&mut self,
|
2019-07-09 05:44:23 +00:00
|
|
|
out: <RPCRequest as OutboundUpgrade<TSubstream>>::Output,
|
2019-07-16 12:32:37 +00:00
|
|
|
rpc_event: Self::OutboundOpenInfo,
|
2019-07-06 11:32:32 +00:00
|
|
|
) {
|
|
|
|
self.dial_negotiated -= 1;
|
|
|
|
|
2019-07-09 05:44:23 +00:00
|
|
|
if self.dial_negotiated == 0
|
|
|
|
&& self.dial_queue.is_empty()
|
2019-11-27 01:47:46 +00:00
|
|
|
&& self.outbound_substreams.is_empty()
|
2019-07-09 05:44:23 +00:00
|
|
|
{
|
2019-07-06 11:32:32 +00:00
|
|
|
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout);
|
2019-07-09 05:44:23 +00:00
|
|
|
} else {
|
2019-07-06 13:43:44 +00:00
|
|
|
self.keep_alive = KeepAlive::Yes;
|
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
|
2019-07-16 12:32:37 +00:00
|
|
|
// add the stream to substreams if we expect a response, otherwise drop the stream.
|
2019-11-27 01:47:46 +00:00
|
|
|
match rpc_event {
|
|
|
|
RPCEvent::Request(id, request) if request.expect_response() => {
|
|
|
|
// new outbound request. Store the stream and tag the output.
|
|
|
|
let delay_key = self
|
|
|
|
.outbound_substreams_delay
|
|
|
|
.insert(id, Duration::from_secs(RESPONSE_TIMEOUT));
|
|
|
|
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
|
2019-07-16 12:32:37 +00:00
|
|
|
substream: out,
|
2019-11-27 01:47:46 +00:00
|
|
|
request,
|
2019-07-16 12:32:37 +00:00
|
|
|
};
|
2019-11-27 01:47:46 +00:00
|
|
|
self.outbound_substreams
|
|
|
|
.insert(id, (awaiting_stream, delay_key));
|
|
|
|
}
|
|
|
|
_ => { // a response is not expected, drop the stream for all other requests
|
2019-07-16 12:32:37 +00:00
|
|
|
}
|
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
2019-07-06 13:43:44 +00:00
|
|
|
// Note: If the substream has closed due to inactivity, or the substream is in the
|
|
|
|
// wrong state a response will fail silently.
|
|
|
|
fn inject_event(&mut self, rpc_event: Self::InEvent) {
|
|
|
|
match rpc_event {
|
2019-07-16 12:32:37 +00:00
|
|
|
RPCEvent::Request(_, _) => self.send_request(rpc_event),
|
2019-11-27 01:47:46 +00:00
|
|
|
RPCEvent::Response(rpc_id, response) => {
|
2019-07-06 13:43:44 +00:00
|
|
|
// check if the stream matching the response still exists
|
2019-11-27 01:47:46 +00:00
|
|
|
// variables indicating if the response is an error response or a multi-part
|
|
|
|
// response
|
|
|
|
let res_is_error = response.is_error();
|
|
|
|
let res_is_multiple = response.multiple_responses();
|
|
|
|
|
|
|
|
match self.inbound_substreams.get_mut(&rpc_id) {
|
|
|
|
Some((substream_state, _)) => {
|
|
|
|
match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) {
|
|
|
|
InboundSubstreamState::ResponseIdle(substream) => {
|
|
|
|
// close the stream if there is no response
|
|
|
|
if let RPCErrorResponse::StreamTermination(_) = response {
|
|
|
|
trace!(self.log, "Stream termination sent. Ending the stream");
|
|
|
|
*substream_state = InboundSubstreamState::Closing(substream);
|
|
|
|
} else {
|
|
|
|
// send the response
|
|
|
|
// if it's a single rpc request or an error, close the stream after
|
|
|
|
*substream_state = InboundSubstreamState::ResponsePendingSend {
|
|
|
|
substream: substream.send(response),
|
|
|
|
closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses
|
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
InboundSubstreamState::ResponsePendingSend { substream, closing }
|
|
|
|
if res_is_multiple =>
|
|
|
|
{
|
|
|
|
// the stream is in use, add the request to a pending queue
|
|
|
|
(*self
|
|
|
|
.queued_outbound_items
|
|
|
|
.entry(rpc_id)
|
|
|
|
.or_insert_with(Vec::new))
|
|
|
|
.push(response);
|
|
|
|
|
|
|
|
// return the state
|
|
|
|
*substream_state = InboundSubstreamState::ResponsePendingSend {
|
|
|
|
substream,
|
|
|
|
closing,
|
|
|
|
};
|
|
|
|
}
|
|
|
|
InboundSubstreamState::Closing(substream) => {
|
|
|
|
*substream_state = InboundSubstreamState::Closing(substream);
|
|
|
|
debug!(self.log, "Response not sent. Stream is closing"; "response" => format!("{}",response));
|
|
|
|
}
|
|
|
|
InboundSubstreamState::ResponsePendingSend { substream, .. } => {
|
|
|
|
*substream_state = InboundSubstreamState::ResponsePendingSend {
|
|
|
|
substream,
|
|
|
|
closing: true,
|
|
|
|
};
|
|
|
|
error!(self.log, "Attempted sending multiple responses to a single response request");
|
|
|
|
}
|
|
|
|
InboundSubstreamState::Poisoned => {
|
|
|
|
crit!(self.log, "Poisoned inbound substream");
|
|
|
|
unreachable!("Coding error: Poisoned substream");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
None => {
|
|
|
|
debug!(self.log, "Stream has expired. Response not sent"; "response" => format!("{}",response));
|
|
|
|
}
|
|
|
|
};
|
2019-07-06 13:43:44 +00:00
|
|
|
}
|
2019-11-27 01:47:46 +00:00
|
|
|
// We do not send errors as responses
|
2019-07-16 12:32:37 +00:00
|
|
|
RPCEvent::Error(_, _) => {}
|
2019-07-06 13:43:44 +00:00
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn inject_dial_upgrade_error(
|
|
|
|
&mut self,
|
2019-12-09 07:50:21 +00:00
|
|
|
request: Self::OutboundOpenInfo,
|
2019-07-06 11:32:32 +00:00
|
|
|
error: ProtocolsHandlerUpgrErr<
|
|
|
|
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error,
|
|
|
|
>,
|
|
|
|
) {
|
2019-12-09 07:50:21 +00:00
|
|
|
if let ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error {
|
|
|
|
self.outbound_io_error_retries += 1;
|
|
|
|
if self.outbound_io_error_retries < IO_ERROR_RETRIES {
|
|
|
|
self.send_request(request);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
self.outbound_io_error_retries = 0;
|
|
|
|
// add the error
|
|
|
|
let request_id = {
|
|
|
|
if let RPCEvent::Request(id, _) = request {
|
|
|
|
id
|
|
|
|
} else {
|
|
|
|
0
|
|
|
|
}
|
|
|
|
};
|
2019-07-06 11:32:32 +00:00
|
|
|
if self.pending_error.is_none() {
|
2019-12-09 07:50:21 +00:00
|
|
|
self.pending_error = Some((request_id, error));
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn connection_keep_alive(&self) -> KeepAlive {
|
|
|
|
self.keep_alive
|
|
|
|
}
|
|
|
|
|
|
|
|
fn poll(
|
|
|
|
&mut self,
|
|
|
|
) -> Poll<
|
|
|
|
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
|
|
|
|
Self::Error,
|
|
|
|
> {
|
2019-12-09 07:50:21 +00:00
|
|
|
if let Some((request_id, err)) = self.pending_error.take() {
|
|
|
|
// Returning an error here will result in dropping the peer.
|
|
|
|
match err {
|
|
|
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(
|
|
|
|
RPCError::InvalidProtocol(protocol_string),
|
|
|
|
)) => {
|
|
|
|
// Peer does not support the protocol.
|
|
|
|
// TODO: We currently will not drop the peer, for maximal compatibility with
|
|
|
|
// other clients testing their software. In the future, we will need to decide
|
|
|
|
// which protocols are a bare minimum to support before kicking the peer.
|
|
|
|
error!(self.log, "Peer doesn't support the RPC protocol"; "protocol" => protocol_string);
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
RPCEvent::Error(request_id, RPCError::InvalidProtocol(protocol_string)),
|
|
|
|
)));
|
|
|
|
}
|
|
|
|
ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => {
|
|
|
|
// negotiation timeout, mark the request as failed
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
RPCEvent::Error(
|
|
|
|
request_id,
|
|
|
|
RPCError::Custom("Protocol negotiation timeout".into()),
|
|
|
|
),
|
|
|
|
)));
|
|
|
|
}
|
|
|
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) => {
|
|
|
|
// IO/Decode/Custom Error, report to the application
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
RPCEvent::Error(request_id, err),
|
|
|
|
)));
|
|
|
|
}
|
|
|
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => {
|
|
|
|
// Error during negotiation
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
RPCEvent::Error(request_id, RPCError::Custom(format!("{}", err))),
|
|
|
|
)));
|
|
|
|
}
|
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
2019-07-16 12:32:37 +00:00
|
|
|
// return any events that need to be reported
|
2019-07-06 11:32:32 +00:00
|
|
|
if !self.events_out.is_empty() {
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
self.events_out.remove(0),
|
|
|
|
)));
|
|
|
|
} else {
|
|
|
|
self.events_out.shrink_to_fit();
|
|
|
|
}
|
|
|
|
|
2019-11-27 01:47:46 +00:00
|
|
|
// purge expired inbound substreams
|
|
|
|
while let Async::Ready(Some(stream_id)) = self
|
|
|
|
.inbound_substreams_delay
|
|
|
|
.poll()
|
|
|
|
.map_err(|_| ProtocolsHandlerUpgrErr::Timer)?
|
|
|
|
{
|
|
|
|
self.inbound_substreams.remove(stream_id.get_ref());
|
|
|
|
}
|
|
|
|
|
|
|
|
// purge expired outbound substreams
|
2019-12-09 07:50:21 +00:00
|
|
|
if let Async::Ready(Some(stream_id)) = self
|
2019-11-27 01:47:46 +00:00
|
|
|
.outbound_substreams_delay
|
|
|
|
.poll()
|
|
|
|
.map_err(|_| ProtocolsHandlerUpgrErr::Timer)?
|
|
|
|
{
|
|
|
|
self.outbound_substreams.remove(stream_id.get_ref());
|
2019-12-09 07:50:21 +00:00
|
|
|
// notify the user
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
RPCEvent::Error(
|
|
|
|
stream_id.get_ref().clone(),
|
|
|
|
RPCError::Custom("Stream timed out".into()),
|
|
|
|
),
|
|
|
|
)));
|
2019-11-27 01:47:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// drive inbound streams that need to be processed
|
|
|
|
for request_id in self.inbound_substreams.keys().copied().collect::<Vec<_>>() {
|
|
|
|
// Drain all queued items until all messages have been processed for this stream
|
|
|
|
// TODO Improve this code logic
|
|
|
|
let mut new_items_to_send = true;
|
2019-11-29 11:25:36 +00:00
|
|
|
while new_items_to_send {
|
2019-11-27 01:47:46 +00:00
|
|
|
new_items_to_send = false;
|
|
|
|
match self.inbound_substreams.entry(request_id) {
|
|
|
|
Entry::Occupied(mut entry) => {
|
|
|
|
match std::mem::replace(
|
|
|
|
&mut entry.get_mut().0,
|
|
|
|
InboundSubstreamState::Poisoned,
|
|
|
|
) {
|
|
|
|
InboundSubstreamState::ResponsePendingSend {
|
|
|
|
mut substream,
|
|
|
|
closing,
|
|
|
|
} => {
|
|
|
|
match substream.poll() {
|
|
|
|
Ok(Async::Ready(raw_substream)) => {
|
|
|
|
// completed the send
|
|
|
|
|
|
|
|
// close the stream if required
|
|
|
|
if closing {
|
|
|
|
entry.get_mut().0 =
|
|
|
|
InboundSubstreamState::Closing(raw_substream)
|
|
|
|
} else {
|
|
|
|
// check for queued chunks and update the stream
|
|
|
|
entry.get_mut().0 = apply_queued_responses(
|
|
|
|
raw_substream,
|
|
|
|
&mut self
|
|
|
|
.queued_outbound_items
|
|
|
|
.get_mut(&request_id),
|
|
|
|
&mut new_items_to_send,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(Async::NotReady) => {
|
|
|
|
entry.get_mut().0 =
|
|
|
|
InboundSubstreamState::ResponsePendingSend {
|
|
|
|
substream,
|
|
|
|
closing,
|
|
|
|
};
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
let delay_key = &entry.get().1;
|
|
|
|
self.inbound_substreams_delay.remove(delay_key);
|
|
|
|
entry.remove_entry();
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
RPCEvent::Error(0, e),
|
|
|
|
)));
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
InboundSubstreamState::ResponseIdle(substream) => {
|
|
|
|
entry.get_mut().0 = apply_queued_responses(
|
|
|
|
substream,
|
|
|
|
&mut self.queued_outbound_items.get_mut(&request_id),
|
|
|
|
&mut new_items_to_send,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
InboundSubstreamState::Closing(mut substream) => {
|
|
|
|
match substream.close() {
|
|
|
|
Ok(Async::Ready(())) | Err(_) => {
|
|
|
|
trace!(self.log, "Inbound stream dropped");
|
|
|
|
let delay_key = &entry.get().1;
|
|
|
|
self.queued_outbound_items.remove(&request_id);
|
|
|
|
self.inbound_substreams_delay.remove(delay_key);
|
|
|
|
entry.remove();
|
|
|
|
} // drop the stream
|
|
|
|
Ok(Async::NotReady) => {
|
|
|
|
entry.get_mut().0 =
|
|
|
|
InboundSubstreamState::Closing(substream);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
InboundSubstreamState::Poisoned => {
|
|
|
|
crit!(self.log, "Poisoned outbound substream");
|
|
|
|
unreachable!("Coding Error: Inbound Substream is poisoned");
|
|
|
|
}
|
|
|
|
};
|
2019-07-16 12:32:37 +00:00
|
|
|
}
|
2019-11-27 01:47:46 +00:00
|
|
|
Entry::Vacant(_) => unreachable!(),
|
2019-07-16 12:32:37 +00:00
|
|
|
}
|
2019-11-27 01:47:46 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// drive outbound streams that need to be processed
|
|
|
|
for request_id in self.outbound_substreams.keys().copied().collect::<Vec<_>>() {
|
|
|
|
match self.outbound_substreams.entry(request_id) {
|
|
|
|
Entry::Occupied(mut entry) => {
|
|
|
|
match std::mem::replace(
|
|
|
|
&mut entry.get_mut().0,
|
|
|
|
OutboundSubstreamState::Poisoned,
|
|
|
|
) {
|
|
|
|
OutboundSubstreamState::RequestPendingResponse {
|
|
|
|
mut substream,
|
|
|
|
request,
|
|
|
|
} => match substream.poll() {
|
|
|
|
Ok(Async::Ready(Some(response))) => {
|
|
|
|
if request.multiple_responses() {
|
|
|
|
entry.get_mut().0 =
|
|
|
|
OutboundSubstreamState::RequestPendingResponse {
|
|
|
|
substream,
|
2019-11-29 11:25:36 +00:00
|
|
|
request,
|
2019-11-27 01:47:46 +00:00
|
|
|
};
|
|
|
|
let delay_key = &entry.get().1;
|
|
|
|
self.outbound_substreams_delay
|
|
|
|
.reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT));
|
|
|
|
} else {
|
|
|
|
trace!(self.log, "Closing single stream request");
|
|
|
|
// only expect a single response, close the stream
|
|
|
|
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
|
|
|
|
}
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
RPCEvent::Response(request_id, response),
|
|
|
|
)));
|
|
|
|
}
|
|
|
|
Ok(Async::Ready(None)) => {
|
|
|
|
// stream closed
|
|
|
|
// if we expected multiple streams send a stream termination,
|
|
|
|
// else report the stream terminating only.
|
|
|
|
trace!(self.log, "RPC Response - stream closed by remote");
|
|
|
|
// drop the stream
|
|
|
|
let delay_key = &entry.get().1;
|
|
|
|
self.outbound_substreams_delay.remove(delay_key);
|
|
|
|
entry.remove_entry();
|
|
|
|
// notify the application error
|
|
|
|
if request.multiple_responses() {
|
|
|
|
// return an end of stream result
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
RPCEvent::Response(
|
|
|
|
request_id,
|
|
|
|
RPCErrorResponse::StreamTermination(
|
|
|
|
request.stream_termination(),
|
|
|
|
),
|
|
|
|
),
|
|
|
|
)));
|
|
|
|
} // else we return an error, stream should not have closed early.
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
RPCEvent::Error(
|
|
|
|
request_id,
|
|
|
|
RPCError::Custom(
|
|
|
|
"Stream closed early. Empty response".into(),
|
|
|
|
),
|
|
|
|
),
|
|
|
|
)));
|
|
|
|
}
|
|
|
|
Ok(Async::NotReady) => {
|
|
|
|
entry.get_mut().0 = OutboundSubstreamState::RequestPendingResponse {
|
2019-07-16 12:32:37 +00:00
|
|
|
substream,
|
2019-11-27 01:47:46 +00:00
|
|
|
request,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
// drop the stream
|
|
|
|
let delay_key = &entry.get().1;
|
|
|
|
self.outbound_substreams_delay.remove(delay_key);
|
|
|
|
entry.remove_entry();
|
|
|
|
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
RPCEvent::Error(request_id, e),
|
|
|
|
)));
|
|
|
|
}
|
|
|
|
},
|
|
|
|
OutboundSubstreamState::Closing(mut substream) => match substream.close() {
|
|
|
|
Ok(Async::Ready(())) | Err(_) => {
|
|
|
|
trace!(self.log, "Outbound stream dropped");
|
|
|
|
// drop the stream
|
|
|
|
let delay_key = &entry.get().1;
|
|
|
|
self.outbound_substreams_delay.remove(delay_key);
|
|
|
|
entry.remove_entry();
|
|
|
|
}
|
|
|
|
Ok(Async::NotReady) => {
|
|
|
|
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
|
|
|
|
}
|
|
|
|
},
|
|
|
|
OutboundSubstreamState::Poisoned => {
|
|
|
|
crit!(self.log, "Poisoned outbound substream");
|
|
|
|
unreachable!("Coding Error: Outbound substream is poisoned")
|
2019-07-16 12:32:37 +00:00
|
|
|
}
|
|
|
|
}
|
2019-11-27 01:47:46 +00:00
|
|
|
}
|
|
|
|
Entry::Vacant(_) => unreachable!(),
|
2019-07-16 12:32:37 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-07-06 13:43:44 +00:00
|
|
|
// establish outbound substreams
|
2019-07-06 11:32:32 +00:00
|
|
|
if !self.dial_queue.is_empty() {
|
|
|
|
if self.dial_negotiated < self.max_dial_negotiated {
|
|
|
|
self.dial_negotiated += 1;
|
2019-07-16 12:32:37 +00:00
|
|
|
let rpc_event = self.dial_queue.remove(0);
|
|
|
|
if let RPCEvent::Request(id, req) = rpc_event {
|
|
|
|
return Ok(Async::Ready(
|
|
|
|
ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
|
|
|
protocol: SubstreamProtocol::new(req.clone()),
|
|
|
|
info: RPCEvent::Request(id, req),
|
|
|
|
},
|
|
|
|
));
|
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
self.dial_queue.shrink_to_fit();
|
|
|
|
}
|
|
|
|
Ok(Async::NotReady)
|
|
|
|
}
|
|
|
|
}
|
2019-11-27 01:47:46 +00:00
|
|
|
|
|
|
|
// Check for new items to send to the peer and update the underlying stream
|
|
|
|
fn apply_queued_responses<TSubstream: AsyncRead + AsyncWrite>(
|
|
|
|
raw_substream: InboundFramed<TSubstream>,
|
|
|
|
queued_outbound_items: &mut Option<&mut Vec<RPCErrorResponse>>,
|
|
|
|
new_items_to_send: &mut bool,
|
|
|
|
) -> InboundSubstreamState<TSubstream> {
|
|
|
|
match queued_outbound_items {
|
|
|
|
Some(ref mut queue) if !queue.is_empty() => {
|
|
|
|
*new_items_to_send = true;
|
|
|
|
// we have queued items
|
|
|
|
match queue.remove(0) {
|
|
|
|
RPCErrorResponse::StreamTermination(_) => {
|
|
|
|
// close the stream if this is a stream termination
|
|
|
|
InboundSubstreamState::Closing(raw_substream)
|
|
|
|
}
|
|
|
|
chunk => InboundSubstreamState::ResponsePendingSend {
|
|
|
|
substream: raw_substream.send(chunk),
|
|
|
|
closing: false,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_ => {
|
|
|
|
// no items queued set to idle
|
|
|
|
InboundSubstreamState::ResponseIdle(raw_substream)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|