2019-11-29 11:25:36 +00:00
|
|
|
#![allow(clippy::type_complexity)]
|
|
|
|
#![allow(clippy::cognitive_complexity)]
|
|
|
|
|
2020-07-23 12:30:43 +00:00
|
|
|
use super::methods::{RPCCodedResponse, RPCResponseErrorCode, RequestId, ResponseTermination};
|
2021-06-17 00:40:16 +00:00
|
|
|
use super::protocol::{Protocol, RPCError, RPCProtocol};
|
2020-06-05 03:07:59 +00:00
|
|
|
use super::{RPCReceived, RPCSend};
|
2021-06-17 00:40:16 +00:00
|
|
|
use crate::rpc::outbound::{OutboundFramed, OutboundRequest};
|
|
|
|
use crate::rpc::protocol::InboundFramed;
|
2019-07-09 05:44:23 +00:00
|
|
|
use fnv::FnvHashMap;
|
|
|
|
use futures::prelude::*;
|
2021-02-10 23:29:49 +00:00
|
|
|
use futures::{Sink, SinkExt};
|
2020-05-03 13:17:12 +00:00
|
|
|
use libp2p::core::upgrade::{
|
|
|
|
InboundUpgrade, NegotiationError, OutboundUpgrade, ProtocolError, UpgradeError,
|
|
|
|
};
|
2019-08-10 01:44:17 +00:00
|
|
|
use libp2p::swarm::protocols_handler::{
|
2019-07-09 05:44:23 +00:00
|
|
|
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
|
2019-07-06 11:32:32 +00:00
|
|
|
};
|
2020-05-17 11:16:48 +00:00
|
|
|
use libp2p::swarm::NegotiatedSubstream;
|
2020-11-16 04:06:14 +00:00
|
|
|
use slog::{crit, debug, trace, warn};
|
2019-07-06 11:32:32 +00:00
|
|
|
use smallvec::SmallVec;
|
2020-05-17 11:16:48 +00:00
|
|
|
use std::{
|
|
|
|
collections::hash_map::Entry,
|
|
|
|
pin::Pin,
|
|
|
|
task::{Context, Poll},
|
2020-07-07 00:13:16 +00:00
|
|
|
time::Duration,
|
2020-05-17 11:16:48 +00:00
|
|
|
};
|
2020-11-28 05:30:57 +00:00
|
|
|
use tokio::time::{sleep_until, Instant as TInstant, Sleep};
|
|
|
|
use tokio_util::time::{delay_queue, DelayQueue};
|
Initial work towards v0.2.0 (#924)
* Remove ping protocol
* Initial renaming of network services
* Correct rebasing relative to latest master
* Start updating types
* Adds HashMapDelay struct to utils
* Initial network restructure
* Network restructure. Adds new types for v0.2.0
* Removes build artefacts
* Shift validation to beacon chain
* Temporarily remove gossip validation
This is to be updated to match current optimisation efforts.
* Adds AggregateAndProof
* Begin rebuilding pubsub encoding/decoding
* Signature hacking
* Shift gossipsup decoding into eth2_libp2p
* Existing EF tests passing with fake_crypto
* Shifts block encoding/decoding into RPC
* Delete outdated API spec
* All release tests passing bar genesis state parsing
* Update and test YamlConfig
* Update to spec v0.10 compatible BLS
* Updates to BLS EF tests
* Add EF test for AggregateVerify
And delete unused hash2curve tests for uncompressed points
* Update EF tests to v0.10.1
* Use optional block root correctly in block proc
* Use genesis fork in deposit domain. All tests pass
* Fast aggregate verify test
* Update REST API docs
* Fix unused import
* Bump spec tags to v0.10.1
* Add `seconds_per_eth1_block` to chainspec
* Update to timestamp based eth1 voting scheme
* Return None from `get_votes_to_consider` if block cache is empty
* Handle overflows in `is_candidate_block`
* Revert to failing tests
* Fix eth1 data sets test
* Choose default vote according to spec
* Fix collect_valid_votes tests
* Fix `get_votes_to_consider` to choose all eligible blocks
* Uncomment winning_vote tests
* Add comments; remove unused code
* Reduce seconds_per_eth1_block for simulation
* Addressed review comments
* Add test for default vote case
* Fix logs
* Remove unused functions
* Meter default eth1 votes
* Fix comments
* Progress on attestation service
* Address review comments; remove unused dependency
* Initial work on removing libp2p lock
* Add LRU caches to store (rollup)
* Update attestation validation for DB changes (WIP)
* Initial version of should_forward_block
* Scaffold
* Progress on attestation validation
Also, consolidate prod+testing slot clocks so that they share much
of the same implementation and can both handle sub-slot time changes.
* Removes lock from libp2p service
* Completed network lock removal
* Finish(?) attestation processing
* Correct network termination future
* Add slot check to block check
* Correct fmt issues
* Remove Drop implementation for network service
* Add first attempt at attestation proc. re-write
* Add version 2 of attestation processing
* Minor fixes
* Add validator pubkey cache
* Make get_indexed_attestation take a committee
* Link signature processing into new attn verification
* First working version
* Ensure pubkey cache is updated
* Add more metrics, slight optimizations
* Clone committee cache during attestation processing
* Update shuffling cache during block processing
* Remove old commented-out code
* Fix shuffling cache insert bug
* Used indexed attestation in fork choice
* Restructure attn processing, add metrics
* Add more detailed metrics
* Tidy, fix failing tests
* Fix failing tests, tidy
* Address reviewers suggestions
* Disable/delete two outdated tests
* Modification of validator for subscriptions
* Add slot signing to validator client
* Further progress on validation subscription
* Adds necessary validator subscription functionality
* Add new Pubkeys struct to signature_sets
* Refactor with functional approach
* Update beacon chain
* Clean up validator <-> beacon node http types
* Add aggregator status to ValidatorDuty
* Impl Clone for manual slot clock
* Fix minor errors
* Further progress validator client subscription
* Initial subscription and aggregation handling
* Remove decompressed member from pubkey bytes
* Progress to modifying val client for attestation aggregation
* First draft of validator client upgrade for aggregate attestations
* Add hashmap for indices lookup
* Add state cache, remove store cache
* Only build the head committee cache
* Removes lock on a network channel
* Partially implement beacon node subscription http api
* Correct compilation issues
* Change `get_attesting_indices` to use Vec
* Fix failing test
* Partial implementation of timer
* Adds timer, removes exit_future, http api to op pool
* Partial multiple aggregate attestation handling
* Permits bulk messages accross gossipsub network channel
* Correct compile issues
* Improve gosispsub messaging and correct rest api helpers
* Added global gossipsub subscriptions
* Update validator subscriptions data structs
* Tidy
* Re-structure validator subscriptions
* Initial handling of subscriptions
* Re-structure network service
* Add pubkey cache persistence file
* Add more comments
* Integrate persistence file into builder
* Add pubkey cache tests
* Add HashSetDelay and introduce into attestation service
* Handles validator subscriptions
* Add data_dir to beacon chain builder
* Remove Option in pubkey cache persistence file
* Ensure consistency between datadir/data_dir
* Fix failing network test
* Peer subnet discovery gets queued for future subscriptions
* Reorganise attestation service functions
* Initial wiring of attestation service
* First draft of attestation service timing logic
* Correct minor typos
* Tidy
* Fix todos
* Improve tests
* Add PeerInfo to connected peers mapping
* Fix compile error
* Fix compile error from merge
* Split up block processing metrics
* Tidy
* Refactor get_pubkey_from_state
* Remove commented-out code
* Rename state_cache -> checkpoint_cache
* Rename Checkpoint -> Snapshot
* Tidy, add comments
* Tidy up find_head function
* Change some checkpoint -> snapshot
* Add tests
* Expose max_len
* Remove dead code
* Tidy
* Fix bug
* Add sync-speed metric
* Add first attempt at VerifiableBlock
* Start integrating into beacon chain
* Integrate VerifiableBlock
* Rename VerifableBlock -> PartialBlockVerification
* Add start of typed methods
* Add progress
* Add further progress
* Rename structs
* Add full block verification to block_processing.rs
* Further beacon chain integration
* Update checks for gossip
* Add todo
* Start adding segement verification
* Add passing chain segement test
* Initial integration with batch sync
* Minor changes
* Tidy, add more error checking
* Start adding chain_segment tests
* Finish invalid signature tests
* Include single and gossip verified blocks in tests
* Add gossip verification tests
* Start adding docs
* Finish adding comments to block_processing.rs
* Rename block_processing.rs -> block_verification
* Start removing old block processing code
* Fixes beacon_chain compilation
* Fix project-wide compile errors
* Remove old code
* Correct code to pass all tests
* Fix bug with beacon proposer index
* Fix shim for BlockProcessingError
* Only process one epoch at a time
* Fix loop in chain segment processing
* Correct tests from master merge
* Add caching for state.eth1_data_votes
* Add BeaconChain::validator_pubkey
* Revert "Add caching for state.eth1_data_votes"
This reverts commit cd73dcd6434fb8d8e6bf30c5356355598ea7b78e.
Co-authored-by: Grant Wuerker <gwuerker@gmail.com>
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
Co-authored-by: Michael Sproul <micsproul@gmail.com>
Co-authored-by: pawan <pawandhananjay@gmail.com>
Co-authored-by: Paul Hauner <paul@paulhauner.com>
2020-03-17 06:24:44 +00:00
|
|
|
use types::EthSpec;
|
2019-11-27 01:47:46 +00:00
|
|
|
|
2019-08-29 11:23:28 +00:00
|
|
|
/// The time (in seconds) before a substream that is awaiting a response from the user times out.
|
|
|
|
pub const RESPONSE_TIMEOUT: u64 = 10;
|
2019-07-06 11:32:32 +00:00
|
|
|
|
2019-12-09 07:50:21 +00:00
|
|
|
/// The number of times to retry an outbound upgrade in the case of IO errors.
|
|
|
|
const IO_ERROR_RETRIES: u8 = 3;
|
|
|
|
|
2020-06-18 01:53:08 +00:00
|
|
|
/// Maximum time given to the handler to perform shutdown operations.
|
|
|
|
const SHUTDOWN_TIMEOUT_SECS: u8 = 15;
|
|
|
|
|
2020-06-05 03:07:59 +00:00
|
|
|
/// Identifier of inbound and outbound substreams from the handler's perspective.
|
|
|
|
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
|
|
|
|
pub struct SubstreamId(usize);
|
|
|
|
|
2020-07-23 12:30:43 +00:00
|
|
|
type InboundSubstream<TSpec> = InboundFramed<NegotiatedSubstream, TSpec>;
|
|
|
|
|
|
|
|
/// Output of the future handling the send of responses to a peer's request.
|
|
|
|
type InboundProcessingOutput<TSpec> = (
|
|
|
|
InboundSubstream<TSpec>, /* substream */
|
|
|
|
Vec<RPCError>, /* Errors sending messages if any */
|
|
|
|
bool, /* whether to remove the stream afterwards */
|
|
|
|
u64, /* Chunks remaining to be sent after this processing finishes */
|
|
|
|
);
|
|
|
|
|
2020-12-08 03:55:50 +00:00
|
|
|
/// Events the handler emits to the behaviour.
|
|
|
|
type HandlerEvent<T> = Result<RPCReceived<T>, HandlerErr>;
|
|
|
|
|
2020-07-07 00:13:16 +00:00
|
|
|
/// An error encountered by the handler.
|
2020-06-05 03:07:59 +00:00
|
|
|
pub enum HandlerErr {
|
2020-07-07 00:13:16 +00:00
|
|
|
/// An error occurred for this peer's request. This can occur during protocol negotiation,
|
|
|
|
/// message passing, or if the handler identifies that we are sending an error response to the peer.
|
2020-06-05 03:07:59 +00:00
|
|
|
Inbound {
|
|
|
|
/// Id of the peer's request for which an error occurred.
|
|
|
|
id: SubstreamId,
|
|
|
|
/// Information of the negotiated protocol.
|
|
|
|
proto: Protocol,
|
2020-07-07 00:13:16 +00:00
|
|
|
/// The error that occurred.
|
2020-06-05 03:07:59 +00:00
|
|
|
error: RPCError,
|
|
|
|
},
|
2020-07-07 00:13:16 +00:00
|
|
|
/// An error occurred for this request. Such error can occur during protocol negotiation,
|
2020-06-05 03:07:59 +00:00
|
|
|
/// message passing, or if we successfully received a response from the peer, but this response
|
|
|
|
/// indicates an error.
|
|
|
|
Outbound {
|
|
|
|
/// Application-given Id of the request for which an error occurred.
|
|
|
|
id: RequestId,
|
|
|
|
/// Information of the protocol.
|
|
|
|
proto: Protocol,
|
2020-07-07 00:13:16 +00:00
|
|
|
/// The error that occurred.
|
2020-06-05 03:07:59 +00:00
|
|
|
error: RPCError,
|
|
|
|
},
|
|
|
|
}
|
2019-11-27 01:47:46 +00:00
|
|
|
|
2019-07-06 11:32:32 +00:00
|
|
|
/// Implementation of `ProtocolsHandler` for the RPC protocol.
|
2020-05-17 11:16:48 +00:00
|
|
|
pub struct RPCHandler<TSpec>
|
2019-07-16 12:32:37 +00:00
|
|
|
where
|
Initial work towards v0.2.0 (#924)
* Remove ping protocol
* Initial renaming of network services
* Correct rebasing relative to latest master
* Start updating types
* Adds HashMapDelay struct to utils
* Initial network restructure
* Network restructure. Adds new types for v0.2.0
* Removes build artefacts
* Shift validation to beacon chain
* Temporarily remove gossip validation
This is to be updated to match current optimisation efforts.
* Adds AggregateAndProof
* Begin rebuilding pubsub encoding/decoding
* Signature hacking
* Shift gossipsup decoding into eth2_libp2p
* Existing EF tests passing with fake_crypto
* Shifts block encoding/decoding into RPC
* Delete outdated API spec
* All release tests passing bar genesis state parsing
* Update and test YamlConfig
* Update to spec v0.10 compatible BLS
* Updates to BLS EF tests
* Add EF test for AggregateVerify
And delete unused hash2curve tests for uncompressed points
* Update EF tests to v0.10.1
* Use optional block root correctly in block proc
* Use genesis fork in deposit domain. All tests pass
* Fast aggregate verify test
* Update REST API docs
* Fix unused import
* Bump spec tags to v0.10.1
* Add `seconds_per_eth1_block` to chainspec
* Update to timestamp based eth1 voting scheme
* Return None from `get_votes_to_consider` if block cache is empty
* Handle overflows in `is_candidate_block`
* Revert to failing tests
* Fix eth1 data sets test
* Choose default vote according to spec
* Fix collect_valid_votes tests
* Fix `get_votes_to_consider` to choose all eligible blocks
* Uncomment winning_vote tests
* Add comments; remove unused code
* Reduce seconds_per_eth1_block for simulation
* Addressed review comments
* Add test for default vote case
* Fix logs
* Remove unused functions
* Meter default eth1 votes
* Fix comments
* Progress on attestation service
* Address review comments; remove unused dependency
* Initial work on removing libp2p lock
* Add LRU caches to store (rollup)
* Update attestation validation for DB changes (WIP)
* Initial version of should_forward_block
* Scaffold
* Progress on attestation validation
Also, consolidate prod+testing slot clocks so that they share much
of the same implementation and can both handle sub-slot time changes.
* Removes lock from libp2p service
* Completed network lock removal
* Finish(?) attestation processing
* Correct network termination future
* Add slot check to block check
* Correct fmt issues
* Remove Drop implementation for network service
* Add first attempt at attestation proc. re-write
* Add version 2 of attestation processing
* Minor fixes
* Add validator pubkey cache
* Make get_indexed_attestation take a committee
* Link signature processing into new attn verification
* First working version
* Ensure pubkey cache is updated
* Add more metrics, slight optimizations
* Clone committee cache during attestation processing
* Update shuffling cache during block processing
* Remove old commented-out code
* Fix shuffling cache insert bug
* Used indexed attestation in fork choice
* Restructure attn processing, add metrics
* Add more detailed metrics
* Tidy, fix failing tests
* Fix failing tests, tidy
* Address reviewers suggestions
* Disable/delete two outdated tests
* Modification of validator for subscriptions
* Add slot signing to validator client
* Further progress on validation subscription
* Adds necessary validator subscription functionality
* Add new Pubkeys struct to signature_sets
* Refactor with functional approach
* Update beacon chain
* Clean up validator <-> beacon node http types
* Add aggregator status to ValidatorDuty
* Impl Clone for manual slot clock
* Fix minor errors
* Further progress validator client subscription
* Initial subscription and aggregation handling
* Remove decompressed member from pubkey bytes
* Progress to modifying val client for attestation aggregation
* First draft of validator client upgrade for aggregate attestations
* Add hashmap for indices lookup
* Add state cache, remove store cache
* Only build the head committee cache
* Removes lock on a network channel
* Partially implement beacon node subscription http api
* Correct compilation issues
* Change `get_attesting_indices` to use Vec
* Fix failing test
* Partial implementation of timer
* Adds timer, removes exit_future, http api to op pool
* Partial multiple aggregate attestation handling
* Permits bulk messages accross gossipsub network channel
* Correct compile issues
* Improve gosispsub messaging and correct rest api helpers
* Added global gossipsub subscriptions
* Update validator subscriptions data structs
* Tidy
* Re-structure validator subscriptions
* Initial handling of subscriptions
* Re-structure network service
* Add pubkey cache persistence file
* Add more comments
* Integrate persistence file into builder
* Add pubkey cache tests
* Add HashSetDelay and introduce into attestation service
* Handles validator subscriptions
* Add data_dir to beacon chain builder
* Remove Option in pubkey cache persistence file
* Ensure consistency between datadir/data_dir
* Fix failing network test
* Peer subnet discovery gets queued for future subscriptions
* Reorganise attestation service functions
* Initial wiring of attestation service
* First draft of attestation service timing logic
* Correct minor typos
* Tidy
* Fix todos
* Improve tests
* Add PeerInfo to connected peers mapping
* Fix compile error
* Fix compile error from merge
* Split up block processing metrics
* Tidy
* Refactor get_pubkey_from_state
* Remove commented-out code
* Rename state_cache -> checkpoint_cache
* Rename Checkpoint -> Snapshot
* Tidy, add comments
* Tidy up find_head function
* Change some checkpoint -> snapshot
* Add tests
* Expose max_len
* Remove dead code
* Tidy
* Fix bug
* Add sync-speed metric
* Add first attempt at VerifiableBlock
* Start integrating into beacon chain
* Integrate VerifiableBlock
* Rename VerifableBlock -> PartialBlockVerification
* Add start of typed methods
* Add progress
* Add further progress
* Rename structs
* Add full block verification to block_processing.rs
* Further beacon chain integration
* Update checks for gossip
* Add todo
* Start adding segement verification
* Add passing chain segement test
* Initial integration with batch sync
* Minor changes
* Tidy, add more error checking
* Start adding chain_segment tests
* Finish invalid signature tests
* Include single and gossip verified blocks in tests
* Add gossip verification tests
* Start adding docs
* Finish adding comments to block_processing.rs
* Rename block_processing.rs -> block_verification
* Start removing old block processing code
* Fixes beacon_chain compilation
* Fix project-wide compile errors
* Remove old code
* Correct code to pass all tests
* Fix bug with beacon proposer index
* Fix shim for BlockProcessingError
* Only process one epoch at a time
* Fix loop in chain segment processing
* Correct tests from master merge
* Add caching for state.eth1_data_votes
* Add BeaconChain::validator_pubkey
* Revert "Add caching for state.eth1_data_votes"
This reverts commit cd73dcd6434fb8d8e6bf30c5356355598ea7b78e.
Co-authored-by: Grant Wuerker <gwuerker@gmail.com>
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
Co-authored-by: Michael Sproul <micsproul@gmail.com>
Co-authored-by: pawan <pawandhananjay@gmail.com>
Co-authored-by: Paul Hauner <paul@paulhauner.com>
2020-03-17 06:24:44 +00:00
|
|
|
TSpec: EthSpec,
|
2019-07-16 12:32:37 +00:00
|
|
|
{
|
2019-07-06 11:32:32 +00:00
|
|
|
/// The upgrade for inbound substreams.
|
2020-08-30 13:06:50 +00:00
|
|
|
listen_protocol: SubstreamProtocol<RPCProtocol<TSpec>, ()>,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Queue of events to produce in `poll()`.
|
2020-12-08 03:55:50 +00:00
|
|
|
events_out: SmallVec<[HandlerEvent<TSpec>; 4]>,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Queue of outbound substreams to open.
|
2021-06-17 00:40:16 +00:00
|
|
|
dial_queue: SmallVec<[(RequestId, OutboundRequest<TSpec>); 4]>,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Current number of concurrent outbound substreams being opened.
|
|
|
|
dial_negotiated: u32,
|
|
|
|
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Current inbound substreams awaiting processing.
|
2020-07-23 12:30:43 +00:00
|
|
|
inbound_substreams: FnvHashMap<SubstreamId, InboundInfo<TSpec>>,
|
2019-11-27 01:47:46 +00:00
|
|
|
|
|
|
|
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
|
2020-06-05 03:07:59 +00:00
|
|
|
inbound_substreams_delay: DelayQueue<SubstreamId>,
|
2019-11-27 01:47:46 +00:00
|
|
|
|
2020-06-05 03:07:59 +00:00
|
|
|
/// Map of outbound substreams that need to be driven to completion.
|
|
|
|
outbound_substreams: FnvHashMap<SubstreamId, OutboundInfo<TSpec>>,
|
2019-07-16 12:32:37 +00:00
|
|
|
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
|
2020-06-05 03:07:59 +00:00
|
|
|
outbound_substreams_delay: DelayQueue<SubstreamId>,
|
2019-11-27 01:47:46 +00:00
|
|
|
|
2020-01-08 03:18:06 +00:00
|
|
|
/// Sequential ID for waiting substreams. For inbound substreams, this is also the inbound request ID.
|
2020-06-05 03:07:59 +00:00
|
|
|
current_inbound_substream_id: SubstreamId,
|
|
|
|
|
|
|
|
/// Sequential ID for outbound substreams.
|
|
|
|
current_outbound_substream_id: SubstreamId,
|
2019-07-06 11:32:32 +00:00
|
|
|
|
|
|
|
/// Maximum number of concurrent outbound substreams being opened. Value is never modified.
|
|
|
|
max_dial_negotiated: u32,
|
|
|
|
|
2020-06-18 01:53:08 +00:00
|
|
|
/// State of the handler.
|
|
|
|
state: HandlerState,
|
|
|
|
|
2019-12-09 07:50:21 +00:00
|
|
|
/// Try to negotiate the outbound upgrade a few times if there is an IO error before reporting the request as failed.
|
|
|
|
/// This keeps track of the number of attempts.
|
|
|
|
outbound_io_error_retries: u8,
|
|
|
|
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Logger for handling RPC streams
|
|
|
|
log: slog::Logger,
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
2020-06-18 01:53:08 +00:00
|
|
|
enum HandlerState {
|
|
|
|
/// The handler is active. All messages are sent and received.
|
|
|
|
Active,
|
|
|
|
/// The handler is shutting_down.
|
|
|
|
///
|
|
|
|
/// While in this state the handler rejects new requests but tries to finish existing ones.
|
2020-07-07 00:13:16 +00:00
|
|
|
/// Once the timer expires, all messages are killed.
|
2021-02-10 23:29:49 +00:00
|
|
|
ShuttingDown(Box<Sleep>),
|
2020-06-18 01:53:08 +00:00
|
|
|
/// The handler is deactivated. A goodbye has been sent and no more messages are sent or
|
|
|
|
/// received.
|
|
|
|
Deactivated,
|
|
|
|
}
|
|
|
|
|
2020-07-23 12:30:43 +00:00
|
|
|
/// Contains the information the handler keeps on established inbound substreams.
|
|
|
|
struct InboundInfo<TSpec: EthSpec> {
|
|
|
|
/// State of the substream.
|
|
|
|
state: InboundState<TSpec>,
|
|
|
|
/// Responses queued for sending.
|
|
|
|
pending_items: Vec<RPCCodedResponse<TSpec>>,
|
|
|
|
/// Protocol of the original request we received from the peer.
|
|
|
|
protocol: Protocol,
|
|
|
|
/// Responses that the peer is still expecting from us.
|
|
|
|
remaining_chunks: u64,
|
|
|
|
/// Key to keep track of the substream's timeout via `self.inbound_substreams_delay`.
|
|
|
|
delay_key: Option<delay_queue::Key>,
|
|
|
|
}
|
|
|
|
|
2020-06-05 03:07:59 +00:00
|
|
|
/// Contains the information the handler keeps on established outbound substreams.
|
|
|
|
struct OutboundInfo<TSpec: EthSpec> {
|
|
|
|
/// State of the substream.
|
|
|
|
state: OutboundSubstreamState<TSpec>,
|
|
|
|
/// Key to keep track of the substream's timeout via `self.outbound_substreams_delay`.
|
|
|
|
delay_key: delay_queue::Key,
|
|
|
|
/// Info over the protocol this substream is handling.
|
|
|
|
proto: Protocol,
|
|
|
|
/// Number of chunks to be seen from the peer's response.
|
2020-07-23 12:30:43 +00:00
|
|
|
remaining_chunks: Option<u64>,
|
|
|
|
/// `RequestId` as given by the application that sent the request.
|
2020-06-05 03:07:59 +00:00
|
|
|
req_id: RequestId,
|
|
|
|
}
|
|
|
|
|
2020-07-23 12:30:43 +00:00
|
|
|
/// State of an inbound substream connection.
|
|
|
|
enum InboundState<TSpec: EthSpec> {
|
|
|
|
/// The underlying substream is not being used.
|
|
|
|
Idle(InboundSubstream<TSpec>),
|
|
|
|
/// The underlying substream is processing responses.
|
|
|
|
Busy(Pin<Box<dyn Future<Output = InboundProcessingOutput<TSpec>> + Send>>),
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Temporary state during processing
|
|
|
|
Poisoned,
|
|
|
|
}
|
|
|
|
|
2020-05-17 11:16:48 +00:00
|
|
|
/// State of an outbound substream. Either waiting for a response, or in the process of sending.
|
|
|
|
pub enum OutboundSubstreamState<TSpec: EthSpec> {
|
2019-07-16 12:32:37 +00:00
|
|
|
/// A request has been sent, and we are awaiting a response. This future is driven in the
|
|
|
|
/// handler because GOODBYE requests can be handled and responses dropped instantly.
|
|
|
|
RequestPendingResponse {
|
|
|
|
/// The framed negotiated substream.
|
2020-06-25 14:04:08 +00:00
|
|
|
substream: Box<OutboundFramed<NegotiatedSubstream, TSpec>>,
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Keeps track of the actual request sent.
|
2021-06-17 00:40:16 +00:00
|
|
|
request: OutboundRequest<TSpec>,
|
2019-07-09 05:44:23 +00:00
|
|
|
},
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Closing an outbound substream>
|
2020-06-25 14:04:08 +00:00
|
|
|
Closing(Box<OutboundFramed<NegotiatedSubstream, TSpec>>),
|
2019-11-27 01:47:46 +00:00
|
|
|
/// Temporary state during processing
|
|
|
|
Poisoned,
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
2020-05-17 11:16:48 +00:00
|
|
|
impl<TSpec> RPCHandler<TSpec>
|
2019-07-16 12:32:37 +00:00
|
|
|
where
|
Initial work towards v0.2.0 (#924)
* Remove ping protocol
* Initial renaming of network services
* Correct rebasing relative to latest master
* Start updating types
* Adds HashMapDelay struct to utils
* Initial network restructure
* Network restructure. Adds new types for v0.2.0
* Removes build artefacts
* Shift validation to beacon chain
* Temporarily remove gossip validation
This is to be updated to match current optimisation efforts.
* Adds AggregateAndProof
* Begin rebuilding pubsub encoding/decoding
* Signature hacking
* Shift gossipsup decoding into eth2_libp2p
* Existing EF tests passing with fake_crypto
* Shifts block encoding/decoding into RPC
* Delete outdated API spec
* All release tests passing bar genesis state parsing
* Update and test YamlConfig
* Update to spec v0.10 compatible BLS
* Updates to BLS EF tests
* Add EF test for AggregateVerify
And delete unused hash2curve tests for uncompressed points
* Update EF tests to v0.10.1
* Use optional block root correctly in block proc
* Use genesis fork in deposit domain. All tests pass
* Fast aggregate verify test
* Update REST API docs
* Fix unused import
* Bump spec tags to v0.10.1
* Add `seconds_per_eth1_block` to chainspec
* Update to timestamp based eth1 voting scheme
* Return None from `get_votes_to_consider` if block cache is empty
* Handle overflows in `is_candidate_block`
* Revert to failing tests
* Fix eth1 data sets test
* Choose default vote according to spec
* Fix collect_valid_votes tests
* Fix `get_votes_to_consider` to choose all eligible blocks
* Uncomment winning_vote tests
* Add comments; remove unused code
* Reduce seconds_per_eth1_block for simulation
* Addressed review comments
* Add test for default vote case
* Fix logs
* Remove unused functions
* Meter default eth1 votes
* Fix comments
* Progress on attestation service
* Address review comments; remove unused dependency
* Initial work on removing libp2p lock
* Add LRU caches to store (rollup)
* Update attestation validation for DB changes (WIP)
* Initial version of should_forward_block
* Scaffold
* Progress on attestation validation
Also, consolidate prod+testing slot clocks so that they share much
of the same implementation and can both handle sub-slot time changes.
* Removes lock from libp2p service
* Completed network lock removal
* Finish(?) attestation processing
* Correct network termination future
* Add slot check to block check
* Correct fmt issues
* Remove Drop implementation for network service
* Add first attempt at attestation proc. re-write
* Add version 2 of attestation processing
* Minor fixes
* Add validator pubkey cache
* Make get_indexed_attestation take a committee
* Link signature processing into new attn verification
* First working version
* Ensure pubkey cache is updated
* Add more metrics, slight optimizations
* Clone committee cache during attestation processing
* Update shuffling cache during block processing
* Remove old commented-out code
* Fix shuffling cache insert bug
* Used indexed attestation in fork choice
* Restructure attn processing, add metrics
* Add more detailed metrics
* Tidy, fix failing tests
* Fix failing tests, tidy
* Address reviewers suggestions
* Disable/delete two outdated tests
* Modification of validator for subscriptions
* Add slot signing to validator client
* Further progress on validation subscription
* Adds necessary validator subscription functionality
* Add new Pubkeys struct to signature_sets
* Refactor with functional approach
* Update beacon chain
* Clean up validator <-> beacon node http types
* Add aggregator status to ValidatorDuty
* Impl Clone for manual slot clock
* Fix minor errors
* Further progress validator client subscription
* Initial subscription and aggregation handling
* Remove decompressed member from pubkey bytes
* Progress to modifying val client for attestation aggregation
* First draft of validator client upgrade for aggregate attestations
* Add hashmap for indices lookup
* Add state cache, remove store cache
* Only build the head committee cache
* Removes lock on a network channel
* Partially implement beacon node subscription http api
* Correct compilation issues
* Change `get_attesting_indices` to use Vec
* Fix failing test
* Partial implementation of timer
* Adds timer, removes exit_future, http api to op pool
* Partial multiple aggregate attestation handling
* Permits bulk messages accross gossipsub network channel
* Correct compile issues
* Improve gosispsub messaging and correct rest api helpers
* Added global gossipsub subscriptions
* Update validator subscriptions data structs
* Tidy
* Re-structure validator subscriptions
* Initial handling of subscriptions
* Re-structure network service
* Add pubkey cache persistence file
* Add more comments
* Integrate persistence file into builder
* Add pubkey cache tests
* Add HashSetDelay and introduce into attestation service
* Handles validator subscriptions
* Add data_dir to beacon chain builder
* Remove Option in pubkey cache persistence file
* Ensure consistency between datadir/data_dir
* Fix failing network test
* Peer subnet discovery gets queued for future subscriptions
* Reorganise attestation service functions
* Initial wiring of attestation service
* First draft of attestation service timing logic
* Correct minor typos
* Tidy
* Fix todos
* Improve tests
* Add PeerInfo to connected peers mapping
* Fix compile error
* Fix compile error from merge
* Split up block processing metrics
* Tidy
* Refactor get_pubkey_from_state
* Remove commented-out code
* Rename state_cache -> checkpoint_cache
* Rename Checkpoint -> Snapshot
* Tidy, add comments
* Tidy up find_head function
* Change some checkpoint -> snapshot
* Add tests
* Expose max_len
* Remove dead code
* Tidy
* Fix bug
* Add sync-speed metric
* Add first attempt at VerifiableBlock
* Start integrating into beacon chain
* Integrate VerifiableBlock
* Rename VerifableBlock -> PartialBlockVerification
* Add start of typed methods
* Add progress
* Add further progress
* Rename structs
* Add full block verification to block_processing.rs
* Further beacon chain integration
* Update checks for gossip
* Add todo
* Start adding segement verification
* Add passing chain segement test
* Initial integration with batch sync
* Minor changes
* Tidy, add more error checking
* Start adding chain_segment tests
* Finish invalid signature tests
* Include single and gossip verified blocks in tests
* Add gossip verification tests
* Start adding docs
* Finish adding comments to block_processing.rs
* Rename block_processing.rs -> block_verification
* Start removing old block processing code
* Fixes beacon_chain compilation
* Fix project-wide compile errors
* Remove old code
* Correct code to pass all tests
* Fix bug with beacon proposer index
* Fix shim for BlockProcessingError
* Only process one epoch at a time
* Fix loop in chain segment processing
* Correct tests from master merge
* Add caching for state.eth1_data_votes
* Add BeaconChain::validator_pubkey
* Revert "Add caching for state.eth1_data_votes"
This reverts commit cd73dcd6434fb8d8e6bf30c5356355598ea7b78e.
Co-authored-by: Grant Wuerker <gwuerker@gmail.com>
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
Co-authored-by: Michael Sproul <micsproul@gmail.com>
Co-authored-by: pawan <pawandhananjay@gmail.com>
Co-authored-by: Paul Hauner <paul@paulhauner.com>
2020-03-17 06:24:44 +00:00
|
|
|
TSpec: EthSpec,
|
2019-07-16 12:32:37 +00:00
|
|
|
{
|
2020-08-30 13:06:50 +00:00
|
|
|
pub fn new(
|
|
|
|
listen_protocol: SubstreamProtocol<RPCProtocol<TSpec>, ()>,
|
|
|
|
log: &slog::Logger,
|
|
|
|
) -> Self {
|
2019-07-06 11:32:32 +00:00
|
|
|
RPCHandler {
|
|
|
|
listen_protocol,
|
|
|
|
events_out: SmallVec::new(),
|
|
|
|
dial_queue: SmallVec::new(),
|
|
|
|
dial_negotiated: 0,
|
2019-11-27 01:47:46 +00:00
|
|
|
inbound_substreams: FnvHashMap::default(),
|
|
|
|
outbound_substreams: FnvHashMap::default(),
|
|
|
|
inbound_substreams_delay: DelayQueue::new(),
|
|
|
|
outbound_substreams_delay: DelayQueue::new(),
|
2020-06-05 03:07:59 +00:00
|
|
|
current_inbound_substream_id: SubstreamId(0),
|
|
|
|
current_outbound_substream_id: SubstreamId(0),
|
2020-06-18 01:53:08 +00:00
|
|
|
state: HandlerState::Active,
|
2019-07-06 11:32:32 +00:00
|
|
|
max_dial_negotiated: 8,
|
2019-12-09 07:50:21 +00:00
|
|
|
outbound_io_error_retries: 0,
|
2019-11-27 01:47:46 +00:00
|
|
|
log: log.clone(),
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-18 01:53:08 +00:00
|
|
|
/// Initiates the handler's shutdown process, sending an optional last message to the peer.
|
2021-06-17 00:40:16 +00:00
|
|
|
pub fn shutdown(&mut self, final_msg: Option<(RequestId, OutboundRequest<TSpec>)>) {
|
2020-06-18 01:53:08 +00:00
|
|
|
if matches!(self.state, HandlerState::Active) {
|
2020-11-16 04:06:14 +00:00
|
|
|
if !self.dial_queue.is_empty() {
|
|
|
|
debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len());
|
|
|
|
}
|
2020-06-18 01:53:08 +00:00
|
|
|
// we now drive to completion communications already dialed/established
|
2020-06-24 07:44:28 +00:00
|
|
|
while let Some((id, req)) = self.dial_queue.pop() {
|
2020-12-08 03:55:50 +00:00
|
|
|
self.events_out.push(Err(HandlerErr::Outbound {
|
2020-06-18 01:53:08 +00:00
|
|
|
error: RPCError::HandlerRejected,
|
2020-12-08 03:55:50 +00:00
|
|
|
proto: req.protocol(),
|
|
|
|
id,
|
|
|
|
}));
|
2020-06-18 01:53:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Queue our final message, if any
|
|
|
|
if let Some((id, req)) = final_msg {
|
|
|
|
self.dial_queue.push((id, req));
|
|
|
|
}
|
|
|
|
|
2021-02-10 23:29:49 +00:00
|
|
|
self.state = HandlerState::ShuttingDown(Box::new(sleep_until(
|
2020-06-18 01:53:08 +00:00
|
|
|
TInstant::now() + Duration::from_secs(SHUTDOWN_TIMEOUT_SECS as u64),
|
2021-02-10 23:29:49 +00:00
|
|
|
)));
|
2020-06-18 01:53:08 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-07-16 12:32:37 +00:00
|
|
|
/// Opens an outbound substream with a request.
|
2021-06-17 00:40:16 +00:00
|
|
|
fn send_request(&mut self, id: RequestId, req: OutboundRequest<TSpec>) {
|
2020-06-18 01:53:08 +00:00
|
|
|
match self.state {
|
|
|
|
HandlerState::Active => {
|
|
|
|
self.dial_queue.push((id, req));
|
|
|
|
}
|
2020-12-08 03:55:50 +00:00
|
|
|
_ => self.events_out.push(Err(HandlerErr::Outbound {
|
|
|
|
error: RPCError::HandlerRejected,
|
|
|
|
proto: req.protocol(),
|
|
|
|
id,
|
|
|
|
})),
|
2020-06-18 01:53:08 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Sends a response to a peer's request.
|
|
|
|
// NOTE: If the substream has closed due to inactivity, or the substream is in the
|
|
|
|
// wrong state a response will fail silently.
|
|
|
|
fn send_response(&mut self, inbound_id: SubstreamId, response: RPCCodedResponse<TSpec>) {
|
|
|
|
// check if the stream matching the response still exists
|
2020-07-23 12:30:43 +00:00
|
|
|
let inbound_info = if let Some(info) = self.inbound_substreams.get_mut(&inbound_id) {
|
|
|
|
info
|
|
|
|
} else {
|
2020-11-16 04:06:14 +00:00
|
|
|
if !matches!(response, RPCCodedResponse::StreamTermination(..)) {
|
|
|
|
// the stream is closed after sending the expected number of responses
|
|
|
|
trace!(self.log, "Inbound stream has expired, response not sent";
|
|
|
|
"response" => %response, "id" => inbound_id);
|
|
|
|
}
|
2020-07-23 12:30:43 +00:00
|
|
|
return;
|
2020-06-18 01:53:08 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
// If the response we are sending is an error, report back for handling
|
2020-07-23 14:18:00 +00:00
|
|
|
if let RPCCodedResponse::Error(ref code, ref reason) = response {
|
2020-12-08 03:55:50 +00:00
|
|
|
self.events_out.push(Err(HandlerErr::Inbound {
|
2020-07-23 14:18:00 +00:00
|
|
|
error: RPCError::ErrorResponse(*code, reason.to_string()),
|
2020-12-08 03:55:50 +00:00
|
|
|
proto: inbound_info.protocol,
|
|
|
|
id: inbound_id,
|
|
|
|
}));
|
2020-06-18 01:53:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if matches!(self.state, HandlerState::Deactivated) {
|
|
|
|
// we no longer send responses after the handler is deactivated
|
|
|
|
debug!(self.log, "Response not sent. Deactivated handler";
|
2020-11-30 10:33:00 +00:00
|
|
|
"response" => %response, "id" => inbound_id);
|
2020-06-18 01:53:08 +00:00
|
|
|
return;
|
|
|
|
}
|
2020-07-23 12:30:43 +00:00
|
|
|
inbound_info.pending_items.push(response);
|
2020-05-29 02:03:13 +00:00
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
2020-05-17 11:16:48 +00:00
|
|
|
impl<TSpec> ProtocolsHandler for RPCHandler<TSpec>
|
2019-07-09 05:44:23 +00:00
|
|
|
where
|
Initial work towards v0.2.0 (#924)
* Remove ping protocol
* Initial renaming of network services
* Correct rebasing relative to latest master
* Start updating types
* Adds HashMapDelay struct to utils
* Initial network restructure
* Network restructure. Adds new types for v0.2.0
* Removes build artefacts
* Shift validation to beacon chain
* Temporarily remove gossip validation
This is to be updated to match current optimisation efforts.
* Adds AggregateAndProof
* Begin rebuilding pubsub encoding/decoding
* Signature hacking
* Shift gossipsup decoding into eth2_libp2p
* Existing EF tests passing with fake_crypto
* Shifts block encoding/decoding into RPC
* Delete outdated API spec
* All release tests passing bar genesis state parsing
* Update and test YamlConfig
* Update to spec v0.10 compatible BLS
* Updates to BLS EF tests
* Add EF test for AggregateVerify
And delete unused hash2curve tests for uncompressed points
* Update EF tests to v0.10.1
* Use optional block root correctly in block proc
* Use genesis fork in deposit domain. All tests pass
* Fast aggregate verify test
* Update REST API docs
* Fix unused import
* Bump spec tags to v0.10.1
* Add `seconds_per_eth1_block` to chainspec
* Update to timestamp based eth1 voting scheme
* Return None from `get_votes_to_consider` if block cache is empty
* Handle overflows in `is_candidate_block`
* Revert to failing tests
* Fix eth1 data sets test
* Choose default vote according to spec
* Fix collect_valid_votes tests
* Fix `get_votes_to_consider` to choose all eligible blocks
* Uncomment winning_vote tests
* Add comments; remove unused code
* Reduce seconds_per_eth1_block for simulation
* Addressed review comments
* Add test for default vote case
* Fix logs
* Remove unused functions
* Meter default eth1 votes
* Fix comments
* Progress on attestation service
* Address review comments; remove unused dependency
* Initial work on removing libp2p lock
* Add LRU caches to store (rollup)
* Update attestation validation for DB changes (WIP)
* Initial version of should_forward_block
* Scaffold
* Progress on attestation validation
Also, consolidate prod+testing slot clocks so that they share much
of the same implementation and can both handle sub-slot time changes.
* Removes lock from libp2p service
* Completed network lock removal
* Finish(?) attestation processing
* Correct network termination future
* Add slot check to block check
* Correct fmt issues
* Remove Drop implementation for network service
* Add first attempt at attestation proc. re-write
* Add version 2 of attestation processing
* Minor fixes
* Add validator pubkey cache
* Make get_indexed_attestation take a committee
* Link signature processing into new attn verification
* First working version
* Ensure pubkey cache is updated
* Add more metrics, slight optimizations
* Clone committee cache during attestation processing
* Update shuffling cache during block processing
* Remove old commented-out code
* Fix shuffling cache insert bug
* Used indexed attestation in fork choice
* Restructure attn processing, add metrics
* Add more detailed metrics
* Tidy, fix failing tests
* Fix failing tests, tidy
* Address reviewers suggestions
* Disable/delete two outdated tests
* Modification of validator for subscriptions
* Add slot signing to validator client
* Further progress on validation subscription
* Adds necessary validator subscription functionality
* Add new Pubkeys struct to signature_sets
* Refactor with functional approach
* Update beacon chain
* Clean up validator <-> beacon node http types
* Add aggregator status to ValidatorDuty
* Impl Clone for manual slot clock
* Fix minor errors
* Further progress validator client subscription
* Initial subscription and aggregation handling
* Remove decompressed member from pubkey bytes
* Progress to modifying val client for attestation aggregation
* First draft of validator client upgrade for aggregate attestations
* Add hashmap for indices lookup
* Add state cache, remove store cache
* Only build the head committee cache
* Removes lock on a network channel
* Partially implement beacon node subscription http api
* Correct compilation issues
* Change `get_attesting_indices` to use Vec
* Fix failing test
* Partial implementation of timer
* Adds timer, removes exit_future, http api to op pool
* Partial multiple aggregate attestation handling
* Permits bulk messages accross gossipsub network channel
* Correct compile issues
* Improve gosispsub messaging and correct rest api helpers
* Added global gossipsub subscriptions
* Update validator subscriptions data structs
* Tidy
* Re-structure validator subscriptions
* Initial handling of subscriptions
* Re-structure network service
* Add pubkey cache persistence file
* Add more comments
* Integrate persistence file into builder
* Add pubkey cache tests
* Add HashSetDelay and introduce into attestation service
* Handles validator subscriptions
* Add data_dir to beacon chain builder
* Remove Option in pubkey cache persistence file
* Ensure consistency between datadir/data_dir
* Fix failing network test
* Peer subnet discovery gets queued for future subscriptions
* Reorganise attestation service functions
* Initial wiring of attestation service
* First draft of attestation service timing logic
* Correct minor typos
* Tidy
* Fix todos
* Improve tests
* Add PeerInfo to connected peers mapping
* Fix compile error
* Fix compile error from merge
* Split up block processing metrics
* Tidy
* Refactor get_pubkey_from_state
* Remove commented-out code
* Rename state_cache -> checkpoint_cache
* Rename Checkpoint -> Snapshot
* Tidy, add comments
* Tidy up find_head function
* Change some checkpoint -> snapshot
* Add tests
* Expose max_len
* Remove dead code
* Tidy
* Fix bug
* Add sync-speed metric
* Add first attempt at VerifiableBlock
* Start integrating into beacon chain
* Integrate VerifiableBlock
* Rename VerifableBlock -> PartialBlockVerification
* Add start of typed methods
* Add progress
* Add further progress
* Rename structs
* Add full block verification to block_processing.rs
* Further beacon chain integration
* Update checks for gossip
* Add todo
* Start adding segement verification
* Add passing chain segement test
* Initial integration with batch sync
* Minor changes
* Tidy, add more error checking
* Start adding chain_segment tests
* Finish invalid signature tests
* Include single and gossip verified blocks in tests
* Add gossip verification tests
* Start adding docs
* Finish adding comments to block_processing.rs
* Rename block_processing.rs -> block_verification
* Start removing old block processing code
* Fixes beacon_chain compilation
* Fix project-wide compile errors
* Remove old code
* Correct code to pass all tests
* Fix bug with beacon proposer index
* Fix shim for BlockProcessingError
* Only process one epoch at a time
* Fix loop in chain segment processing
* Correct tests from master merge
* Add caching for state.eth1_data_votes
* Add BeaconChain::validator_pubkey
* Revert "Add caching for state.eth1_data_votes"
This reverts commit cd73dcd6434fb8d8e6bf30c5356355598ea7b78e.
Co-authored-by: Grant Wuerker <gwuerker@gmail.com>
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
Co-authored-by: Michael Sproul <micsproul@gmail.com>
Co-authored-by: pawan <pawandhananjay@gmail.com>
Co-authored-by: Paul Hauner <paul@paulhauner.com>
2020-03-17 06:24:44 +00:00
|
|
|
TSpec: EthSpec,
|
2019-07-06 11:32:32 +00:00
|
|
|
{
|
2020-06-05 03:07:59 +00:00
|
|
|
type InEvent = RPCSend<TSpec>;
|
2020-12-08 03:55:50 +00:00
|
|
|
type OutEvent = HandlerEvent<TSpec>;
|
2020-05-17 11:16:48 +00:00
|
|
|
type Error = RPCError;
|
Initial work towards v0.2.0 (#924)
* Remove ping protocol
* Initial renaming of network services
* Correct rebasing relative to latest master
* Start updating types
* Adds HashMapDelay struct to utils
* Initial network restructure
* Network restructure. Adds new types for v0.2.0
* Removes build artefacts
* Shift validation to beacon chain
* Temporarily remove gossip validation
This is to be updated to match current optimisation efforts.
* Adds AggregateAndProof
* Begin rebuilding pubsub encoding/decoding
* Signature hacking
* Shift gossipsup decoding into eth2_libp2p
* Existing EF tests passing with fake_crypto
* Shifts block encoding/decoding into RPC
* Delete outdated API spec
* All release tests passing bar genesis state parsing
* Update and test YamlConfig
* Update to spec v0.10 compatible BLS
* Updates to BLS EF tests
* Add EF test for AggregateVerify
And delete unused hash2curve tests for uncompressed points
* Update EF tests to v0.10.1
* Use optional block root correctly in block proc
* Use genesis fork in deposit domain. All tests pass
* Fast aggregate verify test
* Update REST API docs
* Fix unused import
* Bump spec tags to v0.10.1
* Add `seconds_per_eth1_block` to chainspec
* Update to timestamp based eth1 voting scheme
* Return None from `get_votes_to_consider` if block cache is empty
* Handle overflows in `is_candidate_block`
* Revert to failing tests
* Fix eth1 data sets test
* Choose default vote according to spec
* Fix collect_valid_votes tests
* Fix `get_votes_to_consider` to choose all eligible blocks
* Uncomment winning_vote tests
* Add comments; remove unused code
* Reduce seconds_per_eth1_block for simulation
* Addressed review comments
* Add test for default vote case
* Fix logs
* Remove unused functions
* Meter default eth1 votes
* Fix comments
* Progress on attestation service
* Address review comments; remove unused dependency
* Initial work on removing libp2p lock
* Add LRU caches to store (rollup)
* Update attestation validation for DB changes (WIP)
* Initial version of should_forward_block
* Scaffold
* Progress on attestation validation
Also, consolidate prod+testing slot clocks so that they share much
of the same implementation and can both handle sub-slot time changes.
* Removes lock from libp2p service
* Completed network lock removal
* Finish(?) attestation processing
* Correct network termination future
* Add slot check to block check
* Correct fmt issues
* Remove Drop implementation for network service
* Add first attempt at attestation proc. re-write
* Add version 2 of attestation processing
* Minor fixes
* Add validator pubkey cache
* Make get_indexed_attestation take a committee
* Link signature processing into new attn verification
* First working version
* Ensure pubkey cache is updated
* Add more metrics, slight optimizations
* Clone committee cache during attestation processing
* Update shuffling cache during block processing
* Remove old commented-out code
* Fix shuffling cache insert bug
* Used indexed attestation in fork choice
* Restructure attn processing, add metrics
* Add more detailed metrics
* Tidy, fix failing tests
* Fix failing tests, tidy
* Address reviewers suggestions
* Disable/delete two outdated tests
* Modification of validator for subscriptions
* Add slot signing to validator client
* Further progress on validation subscription
* Adds necessary validator subscription functionality
* Add new Pubkeys struct to signature_sets
* Refactor with functional approach
* Update beacon chain
* Clean up validator <-> beacon node http types
* Add aggregator status to ValidatorDuty
* Impl Clone for manual slot clock
* Fix minor errors
* Further progress validator client subscription
* Initial subscription and aggregation handling
* Remove decompressed member from pubkey bytes
* Progress to modifying val client for attestation aggregation
* First draft of validator client upgrade for aggregate attestations
* Add hashmap for indices lookup
* Add state cache, remove store cache
* Only build the head committee cache
* Removes lock on a network channel
* Partially implement beacon node subscription http api
* Correct compilation issues
* Change `get_attesting_indices` to use Vec
* Fix failing test
* Partial implementation of timer
* Adds timer, removes exit_future, http api to op pool
* Partial multiple aggregate attestation handling
* Permits bulk messages accross gossipsub network channel
* Correct compile issues
* Improve gosispsub messaging and correct rest api helpers
* Added global gossipsub subscriptions
* Update validator subscriptions data structs
* Tidy
* Re-structure validator subscriptions
* Initial handling of subscriptions
* Re-structure network service
* Add pubkey cache persistence file
* Add more comments
* Integrate persistence file into builder
* Add pubkey cache tests
* Add HashSetDelay and introduce into attestation service
* Handles validator subscriptions
* Add data_dir to beacon chain builder
* Remove Option in pubkey cache persistence file
* Ensure consistency between datadir/data_dir
* Fix failing network test
* Peer subnet discovery gets queued for future subscriptions
* Reorganise attestation service functions
* Initial wiring of attestation service
* First draft of attestation service timing logic
* Correct minor typos
* Tidy
* Fix todos
* Improve tests
* Add PeerInfo to connected peers mapping
* Fix compile error
* Fix compile error from merge
* Split up block processing metrics
* Tidy
* Refactor get_pubkey_from_state
* Remove commented-out code
* Rename state_cache -> checkpoint_cache
* Rename Checkpoint -> Snapshot
* Tidy, add comments
* Tidy up find_head function
* Change some checkpoint -> snapshot
* Add tests
* Expose max_len
* Remove dead code
* Tidy
* Fix bug
* Add sync-speed metric
* Add first attempt at VerifiableBlock
* Start integrating into beacon chain
* Integrate VerifiableBlock
* Rename VerifableBlock -> PartialBlockVerification
* Add start of typed methods
* Add progress
* Add further progress
* Rename structs
* Add full block verification to block_processing.rs
* Further beacon chain integration
* Update checks for gossip
* Add todo
* Start adding segement verification
* Add passing chain segement test
* Initial integration with batch sync
* Minor changes
* Tidy, add more error checking
* Start adding chain_segment tests
* Finish invalid signature tests
* Include single and gossip verified blocks in tests
* Add gossip verification tests
* Start adding docs
* Finish adding comments to block_processing.rs
* Rename block_processing.rs -> block_verification
* Start removing old block processing code
* Fixes beacon_chain compilation
* Fix project-wide compile errors
* Remove old code
* Correct code to pass all tests
* Fix bug with beacon proposer index
* Fix shim for BlockProcessingError
* Only process one epoch at a time
* Fix loop in chain segment processing
* Correct tests from master merge
* Add caching for state.eth1_data_votes
* Add BeaconChain::validator_pubkey
* Revert "Add caching for state.eth1_data_votes"
This reverts commit cd73dcd6434fb8d8e6bf30c5356355598ea7b78e.
Co-authored-by: Grant Wuerker <gwuerker@gmail.com>
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
Co-authored-by: Michael Sproul <micsproul@gmail.com>
Co-authored-by: pawan <pawandhananjay@gmail.com>
Co-authored-by: Paul Hauner <paul@paulhauner.com>
2020-03-17 06:24:44 +00:00
|
|
|
type InboundProtocol = RPCProtocol<TSpec>;
|
2021-06-17 00:40:16 +00:00
|
|
|
type OutboundProtocol = OutboundRequest<TSpec>;
|
|
|
|
type OutboundOpenInfo = (RequestId, OutboundRequest<TSpec>); // Keep track of the id and the request
|
2020-08-30 13:06:50 +00:00
|
|
|
type InboundOpenInfo = ();
|
2019-07-06 11:32:32 +00:00
|
|
|
|
2020-08-30 13:06:50 +00:00
|
|
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
|
2019-07-06 11:32:32 +00:00
|
|
|
self.listen_protocol.clone()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn inject_fully_negotiated_inbound(
|
|
|
|
&mut self,
|
2020-05-17 11:16:48 +00:00
|
|
|
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
|
2020-08-30 13:06:50 +00:00
|
|
|
_info: Self::InboundOpenInfo,
|
2019-07-06 11:32:32 +00:00
|
|
|
) {
|
2020-06-18 01:53:08 +00:00
|
|
|
// only accept new peer requests when active
|
|
|
|
if !matches!(self.state, HandlerState::Active) {
|
2019-07-09 05:44:23 +00:00
|
|
|
return;
|
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
|
2020-06-18 01:53:08 +00:00
|
|
|
let (req, substream) = substream;
|
2020-07-23 12:30:43 +00:00
|
|
|
let expected_responses = req.expected_responses();
|
2020-06-18 01:53:08 +00:00
|
|
|
|
|
|
|
// store requests that expect responses
|
2020-07-23 12:30:43 +00:00
|
|
|
if expected_responses > 0 {
|
2020-06-18 01:53:08 +00:00
|
|
|
// Store the stream and tag the output.
|
|
|
|
let delay_key = self.inbound_substreams_delay.insert(
|
|
|
|
self.current_inbound_substream_id,
|
|
|
|
Duration::from_secs(RESPONSE_TIMEOUT),
|
|
|
|
);
|
2020-07-23 12:30:43 +00:00
|
|
|
let awaiting_stream = InboundState::Idle(substream);
|
2020-06-18 01:53:08 +00:00
|
|
|
self.inbound_substreams.insert(
|
|
|
|
self.current_inbound_substream_id,
|
2020-07-23 12:30:43 +00:00
|
|
|
InboundInfo {
|
|
|
|
state: awaiting_stream,
|
|
|
|
pending_items: vec![],
|
|
|
|
delay_key: Some(delay_key),
|
|
|
|
protocol: req.protocol(),
|
|
|
|
remaining_chunks: expected_responses,
|
|
|
|
},
|
2020-06-18 01:53:08 +00:00
|
|
|
);
|
|
|
|
}
|
2019-07-09 05:44:23 +00:00
|
|
|
|
2020-12-08 03:55:50 +00:00
|
|
|
self.events_out.push(Ok(RPCReceived::Request(
|
|
|
|
self.current_inbound_substream_id,
|
|
|
|
req,
|
|
|
|
)));
|
2020-06-05 03:07:59 +00:00
|
|
|
self.current_inbound_substream_id.0 += 1;
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn inject_fully_negotiated_outbound(
|
|
|
|
&mut self,
|
2020-05-17 11:16:48 +00:00
|
|
|
out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
|
2020-05-03 13:17:12 +00:00
|
|
|
request_info: Self::OutboundOpenInfo,
|
2019-07-06 11:32:32 +00:00
|
|
|
) {
|
|
|
|
self.dial_negotiated -= 1;
|
2020-06-18 01:53:08 +00:00
|
|
|
let (id, request) = request_info;
|
|
|
|
let proto = request.protocol();
|
|
|
|
|
|
|
|
// accept outbound connections only if the handler is not deactivated
|
|
|
|
if matches!(self.state, HandlerState::Deactivated) {
|
2020-12-08 03:55:50 +00:00
|
|
|
self.events_out.push(Err(HandlerErr::Outbound {
|
2020-06-18 01:53:08 +00:00
|
|
|
error: RPCError::HandlerRejected,
|
2020-12-08 03:55:50 +00:00
|
|
|
proto,
|
|
|
|
id,
|
|
|
|
}));
|
2020-06-18 01:53:08 +00:00
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
|
2019-07-16 12:32:37 +00:00
|
|
|
// add the stream to substreams if we expect a response, otherwise drop the stream.
|
2020-06-05 03:07:59 +00:00
|
|
|
let expected_responses = request.expected_responses();
|
|
|
|
if expected_responses > 0 {
|
2020-05-03 13:17:12 +00:00
|
|
|
// new outbound request. Store the stream and tag the output.
|
2020-06-05 03:07:59 +00:00
|
|
|
let delay_key = self.outbound_substreams_delay.insert(
|
|
|
|
self.current_outbound_substream_id,
|
|
|
|
Duration::from_secs(RESPONSE_TIMEOUT),
|
|
|
|
);
|
2020-05-03 13:17:12 +00:00
|
|
|
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
|
2020-06-25 14:04:08 +00:00
|
|
|
substream: Box::new(out),
|
2020-06-05 03:07:59 +00:00
|
|
|
request,
|
2020-05-03 13:17:12 +00:00
|
|
|
};
|
2020-06-05 03:07:59 +00:00
|
|
|
let expected_responses = if expected_responses > 1 {
|
|
|
|
// Currently enforced only for multiple responses
|
|
|
|
Some(expected_responses)
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
};
|
|
|
|
if self
|
|
|
|
.outbound_substreams
|
|
|
|
.insert(
|
|
|
|
self.current_outbound_substream_id,
|
|
|
|
OutboundInfo {
|
|
|
|
state: awaiting_stream,
|
|
|
|
delay_key,
|
|
|
|
proto,
|
|
|
|
remaining_chunks: expected_responses,
|
|
|
|
req_id: id,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
.is_some()
|
|
|
|
{
|
2020-11-30 10:33:00 +00:00
|
|
|
crit!(self.log, "Duplicate outbound substream id"; "id" => self.current_outbound_substream_id);
|
2019-07-16 12:32:37 +00:00
|
|
|
}
|
2020-06-05 03:07:59 +00:00
|
|
|
self.current_outbound_substream_id.0 += 1;
|
2019-07-16 12:32:37 +00:00
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
2019-07-06 13:43:44 +00:00
|
|
|
fn inject_event(&mut self, rpc_event: Self::InEvent) {
|
|
|
|
match rpc_event {
|
2020-06-05 03:07:59 +00:00
|
|
|
RPCSend::Request(id, req) => self.send_request(id, req),
|
2020-06-18 01:53:08 +00:00
|
|
|
RPCSend::Response(inbound_id, response) => self.send_response(inbound_id, response),
|
2019-07-06 13:43:44 +00:00
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn inject_dial_upgrade_error(
|
|
|
|
&mut self,
|
2020-05-03 13:17:12 +00:00
|
|
|
request_info: Self::OutboundOpenInfo,
|
2019-07-06 11:32:32 +00:00
|
|
|
error: ProtocolsHandlerUpgrErr<
|
2020-05-17 11:16:48 +00:00
|
|
|
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
|
2019-07-06 11:32:32 +00:00
|
|
|
>,
|
|
|
|
) {
|
2020-05-03 13:17:12 +00:00
|
|
|
let (id, req) = request_info;
|
2019-12-09 07:50:21 +00:00
|
|
|
if let ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error {
|
|
|
|
self.outbound_io_error_retries += 1;
|
|
|
|
if self.outbound_io_error_retries < IO_ERROR_RETRIES {
|
2020-05-03 13:17:12 +00:00
|
|
|
self.send_request(id, req);
|
2019-12-09 07:50:21 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
2020-05-03 13:17:12 +00:00
|
|
|
|
2020-06-18 01:53:08 +00:00
|
|
|
// This dialing is now considered failed
|
|
|
|
self.dial_negotiated -= 1;
|
|
|
|
|
2019-12-09 07:50:21 +00:00
|
|
|
self.outbound_io_error_retries = 0;
|
2020-05-03 13:17:12 +00:00
|
|
|
// map the error
|
2020-06-05 03:07:59 +00:00
|
|
|
let error = match error {
|
2020-05-03 13:17:12 +00:00
|
|
|
ProtocolsHandlerUpgrErr::Timer => RPCError::InternalError("Timer failed"),
|
|
|
|
ProtocolsHandlerUpgrErr::Timeout => RPCError::NegotiationTimeout,
|
|
|
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => e,
|
|
|
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
|
|
|
|
RPCError::UnsupportedProtocol
|
2019-12-09 07:50:21 +00:00
|
|
|
}
|
2020-05-03 13:17:12 +00:00
|
|
|
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(
|
|
|
|
NegotiationError::ProtocolError(e),
|
|
|
|
)) => match e {
|
2020-05-17 11:16:48 +00:00
|
|
|
ProtocolError::IoError(io_err) => RPCError::IoError(io_err.to_string()),
|
2020-05-03 13:17:12 +00:00
|
|
|
ProtocolError::InvalidProtocol => {
|
|
|
|
RPCError::InternalError("Protocol was deemed invalid")
|
|
|
|
}
|
|
|
|
ProtocolError::InvalidMessage | ProtocolError::TooManyProtocols => {
|
|
|
|
// Peer is sending invalid data during the negotiation phase, not
|
|
|
|
// participating in the protocol
|
|
|
|
RPCError::InvalidData
|
|
|
|
}
|
|
|
|
},
|
2019-12-09 07:50:21 +00:00
|
|
|
};
|
2020-12-08 03:55:50 +00:00
|
|
|
self.events_out.push(Err(HandlerErr::Outbound {
|
2020-06-05 03:07:59 +00:00
|
|
|
error,
|
2020-12-08 03:55:50 +00:00
|
|
|
proto: req.protocol(),
|
|
|
|
id,
|
|
|
|
}));
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn connection_keep_alive(&self) -> KeepAlive {
|
2020-08-11 02:16:31 +00:00
|
|
|
// Check that we don't have outbound items pending for dialing, nor dialing, nor
|
|
|
|
// established. Also check that there are no established inbound substreams.
|
|
|
|
// Errors and events need to be reported back, so check those too.
|
|
|
|
let should_shutdown = match self.state {
|
|
|
|
HandlerState::ShuttingDown(_) => {
|
|
|
|
self.dial_queue.is_empty()
|
|
|
|
&& self.outbound_substreams.is_empty()
|
|
|
|
&& self.inbound_substreams.is_empty()
|
|
|
|
&& self.events_out.is_empty()
|
|
|
|
&& self.dial_negotiated == 0
|
|
|
|
}
|
|
|
|
HandlerState::Deactivated => {
|
|
|
|
// Regardless of events, the timeout has expired. Force the disconnect.
|
|
|
|
true
|
|
|
|
}
|
|
|
|
_ => false,
|
|
|
|
};
|
|
|
|
if should_shutdown {
|
|
|
|
KeepAlive::No
|
|
|
|
} else {
|
|
|
|
KeepAlive::Yes
|
|
|
|
}
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn poll(
|
|
|
|
&mut self,
|
2020-05-17 11:16:48 +00:00
|
|
|
cx: &mut Context<'_>,
|
2019-07-06 11:32:32 +00:00
|
|
|
) -> Poll<
|
2020-05-17 11:16:48 +00:00
|
|
|
ProtocolsHandlerEvent<
|
|
|
|
Self::OutboundProtocol,
|
|
|
|
Self::OutboundOpenInfo,
|
|
|
|
Self::OutEvent,
|
|
|
|
Self::Error,
|
|
|
|
>,
|
2019-07-06 11:32:32 +00:00
|
|
|
> {
|
2019-07-16 12:32:37 +00:00
|
|
|
// return any events that need to be reported
|
2019-07-06 11:32:32 +00:00
|
|
|
if !self.events_out.is_empty() {
|
2020-12-08 03:55:50 +00:00
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0)));
|
2019-07-06 11:32:32 +00:00
|
|
|
} else {
|
|
|
|
self.events_out.shrink_to_fit();
|
|
|
|
}
|
|
|
|
|
2020-06-18 01:53:08 +00:00
|
|
|
// Check if we are shutting down, and if the timer ran out
|
|
|
|
if let HandlerState::ShuttingDown(delay) = &self.state {
|
|
|
|
if delay.is_elapsed() {
|
|
|
|
self.state = HandlerState::Deactivated;
|
|
|
|
debug!(self.log, "Handler deactivated");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-01-08 03:18:06 +00:00
|
|
|
// purge expired inbound substreams and send an error
|
2020-05-17 11:16:48 +00:00
|
|
|
loop {
|
2020-11-28 05:30:57 +00:00
|
|
|
match self.inbound_substreams_delay.poll_expired(cx) {
|
2020-06-05 03:07:59 +00:00
|
|
|
Poll::Ready(Some(Ok(inbound_id))) => {
|
2020-05-17 11:16:48 +00:00
|
|
|
// handle a stream timeout for various states
|
2020-07-23 12:30:43 +00:00
|
|
|
if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) {
|
2020-05-17 11:16:48 +00:00
|
|
|
// the delay has been removed
|
2020-07-23 12:30:43 +00:00
|
|
|
info.delay_key = None;
|
2020-12-08 03:55:50 +00:00
|
|
|
self.events_out.push(Err(HandlerErr::Inbound {
|
2020-06-05 03:07:59 +00:00
|
|
|
error: RPCError::StreamTimeout,
|
2020-12-08 03:55:50 +00:00
|
|
|
proto: info.protocol,
|
|
|
|
id: *inbound_id.get_ref(),
|
|
|
|
}));
|
2020-06-05 03:07:59 +00:00
|
|
|
|
2020-07-23 12:30:43 +00:00
|
|
|
if info.pending_items.last().map(|l| l.close_after()) == Some(false) {
|
|
|
|
// if the last chunk does not close the stream, append an error
|
|
|
|
info.pending_items.push(RPCCodedResponse::Error(
|
|
|
|
RPCResponseErrorCode::ServerError,
|
|
|
|
"Request timed out".into(),
|
|
|
|
));
|
|
|
|
}
|
2020-05-17 11:16:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Poll::Ready(Some(Err(e))) => {
|
2020-11-30 10:33:00 +00:00
|
|
|
warn!(self.log, "Inbound substream poll failed"; "error" => ?e);
|
2020-05-17 11:16:48 +00:00
|
|
|
// drops the peer if we cannot read the delay queue
|
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError(
|
|
|
|
"Could not poll inbound stream timer",
|
|
|
|
)));
|
|
|
|
}
|
|
|
|
Poll::Pending | Poll::Ready(None) => break,
|
2020-01-08 03:18:06 +00:00
|
|
|
}
|
2019-11-27 01:47:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// purge expired outbound substreams
|
2020-05-17 11:16:48 +00:00
|
|
|
loop {
|
2020-11-28 05:30:57 +00:00
|
|
|
match self.outbound_substreams_delay.poll_expired(cx) {
|
2020-06-05 03:07:59 +00:00
|
|
|
Poll::Ready(Some(Ok(outbound_id))) => {
|
|
|
|
if let Some(OutboundInfo { proto, req_id, .. }) =
|
|
|
|
self.outbound_substreams.remove(outbound_id.get_ref())
|
2020-05-17 11:16:48 +00:00
|
|
|
{
|
2020-06-05 03:07:59 +00:00
|
|
|
let outbound_err = HandlerErr::Outbound {
|
|
|
|
id: req_id,
|
|
|
|
proto,
|
|
|
|
error: RPCError::StreamTimeout,
|
|
|
|
};
|
2020-05-17 11:16:48 +00:00
|
|
|
// notify the user
|
2020-06-05 03:07:59 +00:00
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
|
2020-05-17 11:16:48 +00:00
|
|
|
} else {
|
2020-06-05 03:07:59 +00:00
|
|
|
crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref());
|
2020-05-17 11:16:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Poll::Ready(Some(Err(e))) => {
|
2020-11-30 10:33:00 +00:00
|
|
|
warn!(self.log, "Outbound substream poll failed"; "error" => ?e);
|
2020-05-17 11:16:48 +00:00
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError(
|
|
|
|
"Could not poll outbound stream timer",
|
|
|
|
)));
|
|
|
|
}
|
|
|
|
Poll::Pending | Poll::Ready(None) => break,
|
2020-05-03 13:17:12 +00:00
|
|
|
}
|
2019-11-27 01:47:46 +00:00
|
|
|
}
|
|
|
|
|
2020-06-18 01:53:08 +00:00
|
|
|
// when deactivated, close all streams
|
|
|
|
let deactivated = matches!(self.state, HandlerState::Deactivated);
|
|
|
|
|
2019-11-27 01:47:46 +00:00
|
|
|
// drive inbound streams that need to be processed
|
2020-07-23 12:30:43 +00:00
|
|
|
let mut substreams_to_remove = Vec::new(); // Closed substreams that need to be removed
|
|
|
|
for (id, info) in self.inbound_substreams.iter_mut() {
|
2020-11-06 04:14:14 +00:00
|
|
|
loop {
|
|
|
|
match std::mem::replace(&mut info.state, InboundState::Poisoned) {
|
|
|
|
InboundState::Idle(substream) if !deactivated => {
|
|
|
|
if !info.pending_items.is_empty() {
|
2021-06-18 05:58:01 +00:00
|
|
|
let to_send = std::mem::take(&mut info.pending_items);
|
2020-11-06 04:14:14 +00:00
|
|
|
let fut = process_inbound_substream(
|
|
|
|
substream,
|
|
|
|
info.remaining_chunks,
|
|
|
|
to_send,
|
|
|
|
)
|
|
|
|
.boxed();
|
|
|
|
info.state = InboundState::Busy(Box::pin(fut));
|
|
|
|
} else {
|
|
|
|
info.state = InboundState::Idle(substream);
|
|
|
|
break;
|
2020-07-23 12:30:43 +00:00
|
|
|
}
|
|
|
|
}
|
2020-11-06 04:14:14 +00:00
|
|
|
InboundState::Idle(mut substream) => {
|
|
|
|
// handler is deactivated, close the stream and mark it for removal
|
|
|
|
match substream.close().poll_unpin(cx) {
|
|
|
|
// if we can't close right now, put the substream back and try again later
|
|
|
|
Poll::Pending => info.state = InboundState::Idle(substream),
|
|
|
|
Poll::Ready(res) => {
|
2020-11-16 07:28:30 +00:00
|
|
|
// The substream closed, we remove it
|
2020-07-23 14:18:00 +00:00
|
|
|
substreams_to_remove.push(*id);
|
2020-07-23 12:30:43 +00:00
|
|
|
if let Some(ref delay_key) = info.delay_key {
|
|
|
|
self.inbound_substreams_delay.remove(delay_key);
|
|
|
|
}
|
2020-11-06 04:14:14 +00:00
|
|
|
if let Err(error) = res {
|
2020-12-08 03:55:50 +00:00
|
|
|
self.events_out.push(Err(HandlerErr::Inbound {
|
2020-11-06 04:14:14 +00:00
|
|
|
error,
|
|
|
|
proto: info.protocol,
|
2020-12-08 03:55:50 +00:00
|
|
|
id: *id,
|
|
|
|
}));
|
2020-11-06 04:14:14 +00:00
|
|
|
}
|
|
|
|
if info.pending_items.last().map(|l| l.close_after()) == Some(false)
|
|
|
|
{
|
|
|
|
// if the request was still active, report back to cancel it
|
2020-12-08 03:55:50 +00:00
|
|
|
self.events_out.push(Err(HandlerErr::Inbound {
|
2020-11-06 04:14:14 +00:00
|
|
|
error: RPCError::HandlerRejected,
|
2020-12-08 03:55:50 +00:00
|
|
|
proto: info.protocol,
|
|
|
|
id: *id,
|
|
|
|
}));
|
2020-11-06 04:14:14 +00:00
|
|
|
}
|
2019-11-27 01:47:46 +00:00
|
|
|
}
|
2020-05-17 11:16:48 +00:00
|
|
|
}
|
2020-11-06 04:14:14 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
InboundState::Busy(mut fut) => {
|
|
|
|
// first check if sending finished
|
|
|
|
match fut.poll_unpin(cx) {
|
|
|
|
Poll::Ready((substream, errors, remove, new_remaining_chunks)) => {
|
|
|
|
info.remaining_chunks = new_remaining_chunks;
|
|
|
|
// report any error
|
|
|
|
for error in errors {
|
2020-12-08 03:55:50 +00:00
|
|
|
self.events_out.push(Err(HandlerErr::Inbound {
|
2020-11-06 04:14:14 +00:00
|
|
|
error,
|
|
|
|
proto: info.protocol,
|
2020-12-08 03:55:50 +00:00
|
|
|
id: *id,
|
|
|
|
}))
|
2020-11-06 04:14:14 +00:00
|
|
|
}
|
|
|
|
if remove {
|
|
|
|
substreams_to_remove.push(*id);
|
|
|
|
if let Some(ref delay_key) = info.delay_key {
|
|
|
|
self.inbound_substreams_delay.remove(delay_key);
|
|
|
|
}
|
2020-11-28 05:30:57 +00:00
|
|
|
break;
|
2020-11-16 07:28:30 +00:00
|
|
|
} else {
|
|
|
|
// If we are not removing this substream, we reset the timer.
|
|
|
|
// Each chunk is allowed RESPONSE_TIMEOUT to be sent.
|
|
|
|
if let Some(ref delay_key) = info.delay_key {
|
|
|
|
self.inbound_substreams_delay.reset(
|
|
|
|
delay_key,
|
|
|
|
Duration::from_secs(RESPONSE_TIMEOUT),
|
|
|
|
);
|
|
|
|
}
|
2020-11-06 04:14:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// The stream may be currently idle. Attempt to process more
|
|
|
|
// elements
|
|
|
|
|
|
|
|
if !deactivated && !info.pending_items.is_empty() {
|
2021-06-18 05:58:01 +00:00
|
|
|
let to_send = std::mem::take(&mut info.pending_items);
|
2020-11-06 04:14:14 +00:00
|
|
|
let fut = process_inbound_substream(
|
|
|
|
substream,
|
|
|
|
info.remaining_chunks,
|
|
|
|
to_send,
|
|
|
|
)
|
|
|
|
.boxed();
|
|
|
|
info.state = InboundState::Busy(Box::pin(fut));
|
|
|
|
} else {
|
|
|
|
info.state = InboundState::Idle(substream);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Poll::Pending => {
|
|
|
|
info.state = InboundState::Busy(fut);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
InboundState::Poisoned => unreachable!("Poisoned inbound substream"),
|
2019-07-16 12:32:37 +00:00
|
|
|
}
|
2019-11-27 01:47:46 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-07-23 12:30:43 +00:00
|
|
|
// remove closed substreams
|
|
|
|
for inbound_id in substreams_to_remove {
|
|
|
|
self.inbound_substreams.remove(&inbound_id);
|
|
|
|
}
|
|
|
|
|
2019-11-27 01:47:46 +00:00
|
|
|
// drive outbound streams that need to be processed
|
2020-06-05 03:07:59 +00:00
|
|
|
for outbound_id in self.outbound_substreams.keys().copied().collect::<Vec<_>>() {
|
|
|
|
// get the state and mark it as poisoned
|
|
|
|
let (mut entry, state) = match self.outbound_substreams.entry(outbound_id) {
|
2019-11-27 01:47:46 +00:00
|
|
|
Entry::Occupied(mut entry) => {
|
2020-06-05 03:07:59 +00:00
|
|
|
let state = std::mem::replace(
|
|
|
|
&mut entry.get_mut().state,
|
2019-11-27 01:47:46 +00:00
|
|
|
OutboundSubstreamState::Poisoned,
|
2020-06-05 03:07:59 +00:00
|
|
|
);
|
|
|
|
(entry, state)
|
|
|
|
}
|
|
|
|
Entry::Vacant(_) => unreachable!(),
|
|
|
|
};
|
2020-01-08 03:18:06 +00:00
|
|
|
|
2020-06-05 03:07:59 +00:00
|
|
|
match state {
|
2020-06-18 01:53:08 +00:00
|
|
|
OutboundSubstreamState::RequestPendingResponse {
|
|
|
|
substream,
|
|
|
|
request: _,
|
|
|
|
} if deactivated => {
|
|
|
|
// the handler is deactivated. Close the stream
|
|
|
|
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
|
2020-12-08 03:55:50 +00:00
|
|
|
self.events_out.push(Err(HandlerErr::Outbound {
|
2020-06-18 01:53:08 +00:00
|
|
|
error: RPCError::HandlerRejected,
|
2020-12-08 03:55:50 +00:00
|
|
|
proto: entry.get().proto,
|
|
|
|
id: entry.get().req_id,
|
|
|
|
}))
|
2020-06-18 01:53:08 +00:00
|
|
|
}
|
2020-06-05 03:07:59 +00:00
|
|
|
OutboundSubstreamState::RequestPendingResponse {
|
|
|
|
mut substream,
|
|
|
|
request,
|
|
|
|
} => match substream.poll_next_unpin(cx) {
|
|
|
|
Poll::Ready(Some(Ok(response))) => {
|
2020-07-23 12:30:43 +00:00
|
|
|
if request.expected_responses() > 1 && !response.close_after() {
|
2020-06-05 03:07:59 +00:00
|
|
|
let substream_entry = entry.get_mut();
|
|
|
|
let delay_key = &substream_entry.delay_key;
|
|
|
|
// chunks left after this one
|
|
|
|
let remaining_chunks = substream_entry
|
|
|
|
.remaining_chunks
|
|
|
|
.map(|count| count.saturating_sub(1))
|
|
|
|
.unwrap_or_else(|| 0);
|
|
|
|
if remaining_chunks == 0 {
|
|
|
|
// this is the last expected message, close the stream as all expected chunks have been received
|
|
|
|
substream_entry.state = OutboundSubstreamState::Closing(substream);
|
|
|
|
} else {
|
|
|
|
// If the response chunk was expected update the remaining number of chunks expected and reset the Timeout
|
|
|
|
substream_entry.state =
|
|
|
|
OutboundSubstreamState::RequestPendingResponse {
|
|
|
|
substream,
|
|
|
|
request,
|
|
|
|
};
|
|
|
|
substream_entry.remaining_chunks = Some(remaining_chunks);
|
|
|
|
self.outbound_substreams_delay
|
|
|
|
.reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT));
|
2019-11-27 01:47:46 +00:00
|
|
|
}
|
2020-06-05 03:07:59 +00:00
|
|
|
} else {
|
2020-07-23 12:30:43 +00:00
|
|
|
// either this is a single response request or this response closes the
|
|
|
|
// stream
|
2020-06-05 03:07:59 +00:00
|
|
|
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check what type of response we got and report it accordingly
|
|
|
|
let id = entry.get().req_id;
|
|
|
|
let proto = entry.get().proto;
|
|
|
|
|
|
|
|
let received = match response {
|
|
|
|
RPCCodedResponse::StreamTermination(t) => {
|
|
|
|
Ok(RPCReceived::EndOfStream(id, t))
|
2019-11-27 01:47:46 +00:00
|
|
|
}
|
2020-06-05 03:07:59 +00:00
|
|
|
RPCCodedResponse::Success(resp) => Ok(RPCReceived::Response(id, resp)),
|
2020-07-23 12:30:43 +00:00
|
|
|
RPCCodedResponse::Error(ref code, ref r) => Err(HandlerErr::Outbound {
|
|
|
|
id,
|
|
|
|
proto,
|
|
|
|
error: RPCError::ErrorResponse(*code, r.to_string()),
|
|
|
|
}),
|
2020-06-05 03:07:59 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(received));
|
|
|
|
}
|
|
|
|
Poll::Ready(None) => {
|
|
|
|
// stream closed
|
|
|
|
// if we expected multiple streams send a stream termination,
|
|
|
|
// else report the stream terminating only.
|
|
|
|
//trace!(self.log, "RPC Response - stream closed by remote");
|
|
|
|
// drop the stream
|
|
|
|
let delay_key = &entry.get().delay_key;
|
2020-07-23 14:18:00 +00:00
|
|
|
let request_id = entry.get().req_id;
|
2020-06-05 03:07:59 +00:00
|
|
|
self.outbound_substreams_delay.remove(delay_key);
|
|
|
|
entry.remove_entry();
|
|
|
|
// notify the application error
|
|
|
|
if request.expected_responses() > 1 {
|
|
|
|
// return an end of stream result
|
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(
|
|
|
|
RPCReceived::EndOfStream(request_id, request.stream_termination()),
|
|
|
|
)));
|
|
|
|
}
|
|
|
|
|
|
|
|
// else we return an error, stream should not have closed early.
|
|
|
|
let outbound_err = HandlerErr::Outbound {
|
|
|
|
id: request_id,
|
|
|
|
proto: request.protocol(),
|
|
|
|
error: RPCError::IncompleteStream,
|
|
|
|
};
|
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
|
|
|
|
}
|
|
|
|
Poll::Pending => {
|
|
|
|
entry.get_mut().state =
|
|
|
|
OutboundSubstreamState::RequestPendingResponse { substream, request }
|
|
|
|
}
|
|
|
|
Poll::Ready(Some(Err(e))) => {
|
|
|
|
// drop the stream
|
|
|
|
let delay_key = &entry.get().delay_key;
|
|
|
|
self.outbound_substreams_delay.remove(delay_key);
|
|
|
|
let outbound_err = HandlerErr::Outbound {
|
|
|
|
id: entry.get().req_id,
|
|
|
|
proto: entry.get().proto,
|
|
|
|
error: e,
|
|
|
|
};
|
|
|
|
entry.remove_entry();
|
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
|
|
|
|
}
|
|
|
|
},
|
|
|
|
OutboundSubstreamState::Closing(mut substream) => {
|
|
|
|
match Sink::poll_close(Pin::new(&mut substream), cx) {
|
|
|
|
Poll::Ready(_) => {
|
|
|
|
// drop the stream and its corresponding timeout
|
|
|
|
let delay_key = &entry.get().delay_key;
|
|
|
|
let protocol = entry.get().proto;
|
|
|
|
let request_id = entry.get().req_id;
|
|
|
|
self.outbound_substreams_delay.remove(delay_key);
|
|
|
|
entry.remove_entry();
|
|
|
|
|
|
|
|
// report the stream termination to the user
|
|
|
|
//
|
|
|
|
// Streams can be terminated here if a responder tries to
|
|
|
|
// continue sending responses beyond what we would expect. Here
|
|
|
|
// we simply terminate the stream and report a stream
|
|
|
|
// termination to the application
|
|
|
|
let termination = match protocol {
|
|
|
|
Protocol::BlocksByRange => Some(ResponseTermination::BlocksByRange),
|
|
|
|
Protocol::BlocksByRoot => Some(ResponseTermination::BlocksByRoot),
|
|
|
|
_ => None, // all other protocols are do not have multiple responses and we do not inform the user, we simply drop the stream.
|
|
|
|
};
|
|
|
|
|
|
|
|
if let Some(termination) = termination {
|
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(
|
|
|
|
RPCReceived::EndOfStream(request_id, termination),
|
|
|
|
)));
|
2019-11-27 01:47:46 +00:00
|
|
|
}
|
2020-05-17 11:16:48 +00:00
|
|
|
}
|
2020-06-05 03:07:59 +00:00
|
|
|
Poll::Pending => {
|
|
|
|
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
|
2019-07-16 12:32:37 +00:00
|
|
|
}
|
|
|
|
}
|
2019-11-27 01:47:46 +00:00
|
|
|
}
|
2020-06-05 03:07:59 +00:00
|
|
|
OutboundSubstreamState::Poisoned => {
|
|
|
|
crit!(self.log, "Poisoned outbound substream");
|
|
|
|
unreachable!("Coding Error: Outbound substream is poisoned")
|
|
|
|
}
|
2019-07-16 12:32:37 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-07-06 13:43:44 +00:00
|
|
|
// establish outbound substreams
|
2020-01-23 06:30:49 +00:00
|
|
|
if !self.dial_queue.is_empty() && self.dial_negotiated < self.max_dial_negotiated {
|
|
|
|
self.dial_negotiated += 1;
|
2020-05-03 13:17:12 +00:00
|
|
|
let (id, req) = self.dial_queue.remove(0);
|
2019-07-06 11:32:32 +00:00
|
|
|
self.dial_queue.shrink_to_fit();
|
2020-05-17 11:16:48 +00:00
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
2020-08-30 13:06:50 +00:00
|
|
|
protocol: SubstreamProtocol::new(req.clone(), ()).map_info(|()| (id, req)),
|
2020-05-17 11:16:48 +00:00
|
|
|
});
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
2020-05-17 11:16:48 +00:00
|
|
|
Poll::Pending
|
2019-07-06 11:32:32 +00:00
|
|
|
}
|
|
|
|
}
|
2019-11-27 01:47:46 +00:00
|
|
|
|
2020-06-05 03:07:59 +00:00
|
|
|
impl slog::Value for SubstreamId {
|
|
|
|
fn serialize(
|
|
|
|
&self,
|
|
|
|
record: &slog::Record,
|
|
|
|
key: slog::Key,
|
|
|
|
serializer: &mut dyn slog::Serializer,
|
|
|
|
) -> slog::Result {
|
|
|
|
slog::Value::serialize(&self.0, record, key, serializer)
|
|
|
|
}
|
|
|
|
}
|
2020-07-23 12:30:43 +00:00
|
|
|
|
|
|
|
/// Sends the queued items to the peer.
|
|
|
|
async fn process_inbound_substream<TSpec: EthSpec>(
|
|
|
|
mut substream: InboundSubstream<TSpec>,
|
|
|
|
mut remaining_chunks: u64,
|
|
|
|
pending_items: Vec<RPCCodedResponse<TSpec>>,
|
|
|
|
) -> InboundProcessingOutput<TSpec> {
|
|
|
|
let mut errors = Vec::new();
|
|
|
|
let mut substream_closed = false;
|
|
|
|
|
|
|
|
for item in pending_items {
|
|
|
|
if !substream_closed {
|
|
|
|
if matches!(item, RPCCodedResponse::StreamTermination(_)) {
|
|
|
|
substream.close().await.unwrap_or_else(|e| errors.push(e));
|
|
|
|
substream_closed = true;
|
|
|
|
} else {
|
|
|
|
remaining_chunks = remaining_chunks.saturating_sub(1);
|
|
|
|
// chunks that are not stream terminations get sent, and the stream is closed if
|
|
|
|
// the response is an error
|
|
|
|
let is_error = matches!(item, RPCCodedResponse::Error(..));
|
|
|
|
|
|
|
|
substream
|
|
|
|
.send(item)
|
|
|
|
.await
|
|
|
|
.unwrap_or_else(|e| errors.push(e));
|
|
|
|
|
|
|
|
if remaining_chunks == 0 || is_error {
|
|
|
|
substream.close().await.unwrap_or_else(|e| errors.push(e));
|
|
|
|
substream_closed = true;
|
|
|
|
}
|
|
|
|
}
|
2020-11-03 10:37:00 +00:00
|
|
|
} else if matches!(item, RPCCodedResponse::StreamTermination(_)) {
|
|
|
|
// The sender closed the stream before us, ignore this.
|
2020-07-23 12:30:43 +00:00
|
|
|
} else {
|
|
|
|
// we have more items after a closed substream, report those as errors
|
|
|
|
errors.push(RPCError::InternalError(
|
|
|
|
"Sending responses to closed inbound substream",
|
|
|
|
));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
(substream, errors, substream_closed, remaining_chunks)
|
|
|
|
}
|