Merge remote-tracking branch 'origin/master' into spec-v0.12
This commit is contained in:
commit
9450a0f30d
@ -51,3 +51,6 @@ slog-term = "2.5.0"
|
||||
slog-async = "2.5.0"
|
||||
tempdir = "0.3.7"
|
||||
exit-future = "0.2.0"
|
||||
|
||||
[features]
|
||||
libp2p-websocket = []
|
||||
|
@ -48,45 +48,20 @@ impl<TSpec: EthSpec> DelegatingHandler<TSpec> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Gives access to the gossipsub handler.
|
||||
pub fn _gossip_mut(&mut self) -> &mut GossipHandler {
|
||||
&mut self.gossip_handler
|
||||
}
|
||||
|
||||
/// Gives mutable access to the rpc handler.
|
||||
pub fn _rpc_mut(&mut self) -> &mut RPCHandler<TSpec> {
|
||||
pub fn rpc_mut(&mut self) -> &mut RPCHandler<TSpec> {
|
||||
&mut self.rpc_handler
|
||||
}
|
||||
|
||||
/// Gives mutable access to identify's handler.
|
||||
pub fn _identify_mut(&mut self) -> &mut IdentifyHandler {
|
||||
&mut self.identify_handler
|
||||
}
|
||||
|
||||
/// Gives mutable access to discovery's handler.
|
||||
pub fn _discovery_mut(&mut self) -> &mut DiscoveryHandler<TSpec> {
|
||||
&mut self.discovery_handler
|
||||
}
|
||||
|
||||
/// Gives access to the gossipsub handler.
|
||||
pub fn _gossip(&self) -> &GossipHandler {
|
||||
&self.gossip_handler
|
||||
}
|
||||
|
||||
/// Gives access to the rpc handler.
|
||||
pub fn _rpc(&self) -> &RPCHandler<TSpec> {
|
||||
pub fn rpc(&self) -> &RPCHandler<TSpec> {
|
||||
&self.rpc_handler
|
||||
}
|
||||
|
||||
/// Gives access to identify's handler.
|
||||
pub fn _identify(&self) -> &IdentifyHandler {
|
||||
pub fn identify(&self) -> &IdentifyHandler {
|
||||
&self.identify_handler
|
||||
}
|
||||
|
||||
/// Gives access to discovery's handler.
|
||||
pub fn _discovery(&self) -> &DiscoveryHandler<TSpec> {
|
||||
&self.discovery_handler
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: this can all be created with macros
|
||||
|
@ -22,8 +22,8 @@ mod delegate;
|
||||
pub struct BehaviourHandler<TSpec: EthSpec> {
|
||||
/// Handler combining all sub behaviour's handlers.
|
||||
delegate: DelegatingHandler<TSpec>,
|
||||
/// Keep alive for this handler.
|
||||
keep_alive: KeepAlive,
|
||||
/// Flag indicating if the handler is shutting down.
|
||||
shutting_down: bool,
|
||||
}
|
||||
|
||||
impl<TSpec: EthSpec> BehaviourHandler<TSpec> {
|
||||
@ -35,7 +35,7 @@ impl<TSpec: EthSpec> BehaviourHandler<TSpec> {
|
||||
) -> Self {
|
||||
BehaviourHandler {
|
||||
delegate: DelegatingHandler::new(gossipsub, rpc, identify, discovery),
|
||||
keep_alive: KeepAlive::Yes,
|
||||
shutting_down: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -43,8 +43,8 @@ impl<TSpec: EthSpec> BehaviourHandler<TSpec> {
|
||||
#[derive(Clone)]
|
||||
pub enum BehaviourHandlerIn<TSpec: EthSpec> {
|
||||
Delegate(DelegateIn<TSpec>),
|
||||
// TODO: replace custom with incoming events
|
||||
Custom,
|
||||
/// Start the shutdown process.
|
||||
Shutdown(Option<(RequestId, RPCRequest<TSpec>)>),
|
||||
}
|
||||
|
||||
pub enum BehaviourHandlerOut<TSpec: EthSpec> {
|
||||
@ -84,8 +84,9 @@ impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
|
||||
match event {
|
||||
BehaviourHandlerIn::Delegate(delegated_ev) => self.delegate.inject_event(delegated_ev),
|
||||
/* Events comming from the behaviour */
|
||||
BehaviourHandlerIn::Custom => {
|
||||
// TODO: implement
|
||||
BehaviourHandlerIn::Shutdown(last_message) => {
|
||||
self.shutting_down = true;
|
||||
self.delegate.rpc_mut().shutdown(last_message);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -101,8 +102,13 @@ impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
// TODO: refine this logic
|
||||
self.keep_alive.min(self.delegate.connection_keep_alive())
|
||||
if self.shutting_down {
|
||||
let rpc_keep_alive = self.delegate.rpc().connection_keep_alive();
|
||||
let identify_keep_alive = self.delegate.identify().connection_keep_alive();
|
||||
rpc_keep_alive.max(identify_keep_alive)
|
||||
} else {
|
||||
KeepAlive::Yes
|
||||
}
|
||||
}
|
||||
|
||||
fn poll(
|
||||
@ -135,7 +141,5 @@ impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
|
||||
// TODO: speak to our behaviour here
|
||||
}
|
||||
}
|
||||
|
@ -15,7 +15,8 @@ use libp2p::{
|
||||
gossipsub::{Gossipsub, GossipsubEvent, MessageId},
|
||||
identify::{Identify, IdentifyEvent},
|
||||
swarm::{
|
||||
NetworkBehaviour, NetworkBehaviourAction as NBAction, PollParameters, ProtocolsHandler,
|
||||
NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler, PollParameters,
|
||||
ProtocolsHandler,
|
||||
},
|
||||
PeerId,
|
||||
};
|
||||
@ -51,7 +52,8 @@ pub struct Behaviour<TSpec: EthSpec> {
|
||||
peer_manager: PeerManager<TSpec>,
|
||||
/// The events generated by this behaviour to be consumed in the swarm poll.
|
||||
events: Vec<BehaviourEvent<TSpec>>,
|
||||
// TODO: add events to send to the handler
|
||||
/// Queue of peers to disconnect.
|
||||
peers_to_dc: Vec<PeerId>,
|
||||
/// The current meta data of the node, so respond to pings and get metadata
|
||||
meta_data: MetaData<TSpec>,
|
||||
/// A cache of recently seen gossip messages. This is used to filter out any possible
|
||||
@ -285,6 +287,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
identify,
|
||||
peer_manager: PeerManager::new(network_globals.clone(), log),
|
||||
events: Vec::new(),
|
||||
peers_to_dc: Vec::new(),
|
||||
seen_gossip_messages: LruCache::new(100_000),
|
||||
meta_data,
|
||||
network_globals,
|
||||
@ -396,36 +399,34 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
|
||||
/// Send a request to a peer over RPC.
|
||||
pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) {
|
||||
self.send_rpc(peer_id, RPCSend::Request(request_id, request.into()))
|
||||
self.eth2_rpc
|
||||
.send_request(peer_id, request_id, request.into())
|
||||
}
|
||||
|
||||
/// Send a successful response to a peer over RPC.
|
||||
pub fn send_successful_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
stream_id: SubstreamId,
|
||||
id: PeerRequestId,
|
||||
response: Response<TSpec>,
|
||||
) {
|
||||
self.send_rpc(peer_id, RPCSend::Response(stream_id, response.into()))
|
||||
self.eth2_rpc.send_response(peer_id, id, response.into())
|
||||
}
|
||||
|
||||
/// Inform the peer that their request produced an error.
|
||||
pub fn _send_error_reponse(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
stream_id: SubstreamId,
|
||||
id: PeerRequestId,
|
||||
error: RPCResponseErrorCode,
|
||||
reason: String,
|
||||
) {
|
||||
self.send_rpc(
|
||||
self.eth2_rpc.send_response(
|
||||
peer_id,
|
||||
RPCSend::Response(stream_id, RPCCodedResponse::from_error_code(error, reason)),
|
||||
id,
|
||||
RPCCodedResponse::from_error_code(error, reason),
|
||||
)
|
||||
}
|
||||
/// Sends an RPC Request/Response via the RPC protocol.
|
||||
fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCSend<TSpec>) {
|
||||
self.eth2_rpc.send_rpc(peer_id, rpc_event);
|
||||
}
|
||||
|
||||
/* Discovery / Peer management functions */
|
||||
|
||||
@ -512,36 +513,32 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
data: self.meta_data.seq_number,
|
||||
};
|
||||
debug!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => peer_id.to_string());
|
||||
let event = RPCSend::Request(id, RPCRequest::Ping(ping));
|
||||
|
||||
self.send_rpc(peer_id, event);
|
||||
self.eth2_rpc
|
||||
.send_request(peer_id, id, RPCRequest::Ping(ping));
|
||||
}
|
||||
|
||||
/// Sends a Pong response to the peer.
|
||||
fn pong(&mut self, id: SubstreamId, peer_id: PeerId) {
|
||||
fn pong(&mut self, id: PeerRequestId, peer_id: PeerId) {
|
||||
let ping = crate::rpc::Ping {
|
||||
data: self.meta_data.seq_number,
|
||||
};
|
||||
debug!(self.log, "Sending Pong"; "request_id" => id, "peer_id" => peer_id.to_string());
|
||||
let event = RPCSend::Response(id, RPCCodedResponse::Success(RPCResponse::Pong(ping)));
|
||||
|
||||
self.send_rpc(peer_id, event);
|
||||
debug!(self.log, "Sending Pong"; "request_id" => id.1, "peer_id" => peer_id.to_string());
|
||||
let event = RPCCodedResponse::Success(RPCResponse::Pong(ping));
|
||||
self.eth2_rpc.send_response(peer_id, id, event);
|
||||
}
|
||||
|
||||
/// Sends a METADATA request to a peer.
|
||||
fn send_meta_data_request(&mut self, peer_id: PeerId) {
|
||||
let metadata_request =
|
||||
RPCSend::Request(RequestId::Behaviour, RPCRequest::MetaData(PhantomData));
|
||||
self.send_rpc(peer_id, metadata_request);
|
||||
let event = RPCRequest::MetaData(PhantomData);
|
||||
self.eth2_rpc
|
||||
.send_request(peer_id, RequestId::Behaviour, event);
|
||||
}
|
||||
|
||||
/// Sends a METADATA response to a peer.
|
||||
fn send_meta_data_response(&mut self, id: SubstreamId, peer_id: PeerId) {
|
||||
let metadata_response = RPCSend::Response(
|
||||
id,
|
||||
RPCCodedResponse::Success(RPCResponse::MetaData(self.meta_data.clone())),
|
||||
);
|
||||
self.send_rpc(peer_id, metadata_response);
|
||||
fn send_meta_data_response(&mut self, id: PeerRequestId, peer_id: PeerId) {
|
||||
let event = RPCCodedResponse::Success(RPCResponse::MetaData(self.meta_data.clone()));
|
||||
self.eth2_rpc.send_response(peer_id, id, event);
|
||||
}
|
||||
|
||||
/// Returns a reference to the peer manager to allow the swarm to notify the manager of peer
|
||||
@ -635,7 +632,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
}
|
||||
|
||||
/// Convenience function to propagate a request.
|
||||
fn propagate_request(&mut self, id: SubstreamId, peer_id: PeerId, request: Request) {
|
||||
fn propagate_request(&mut self, id: PeerRequestId, peer_id: PeerId, request: Request) {
|
||||
self.events.push(BehaviourEvent::RequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
@ -645,6 +642,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
|
||||
fn on_rpc_event(&mut self, message: RPCMessage<TSpec>) {
|
||||
let peer_id = message.peer_id;
|
||||
let handler_id = message.conn_id;
|
||||
// The METADATA and PING RPC responses are handled within the behaviour and not propagated
|
||||
match message.event {
|
||||
Err(handler_err) => {
|
||||
@ -654,6 +652,10 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
proto,
|
||||
error,
|
||||
} => {
|
||||
if matches!(error, RPCError::HandlerRejected) {
|
||||
// this peer's request got canceled
|
||||
// TODO: cancel processing for this request
|
||||
}
|
||||
// Inform the peer manager of the error.
|
||||
// An inbound error here means we sent an error to the peer, or the stream
|
||||
// timed out.
|
||||
@ -670,37 +672,48 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(RPCReceived::Request(id, request)) => match request {
|
||||
/* Behaviour managed protocols: Ping and Metadata */
|
||||
RPCRequest::Ping(ping) => {
|
||||
// inform the peer manager and send the response
|
||||
self.peer_manager.ping_request(&peer_id, ping.data);
|
||||
// send a ping response
|
||||
self.pong(id, peer_id);
|
||||
Ok(RPCReceived::Request(id, request)) => {
|
||||
let peer_request_id = (handler_id, id);
|
||||
match request {
|
||||
/* Behaviour managed protocols: Ping and Metadata */
|
||||
RPCRequest::Ping(ping) => {
|
||||
// inform the peer manager and send the response
|
||||
self.peer_manager.ping_request(&peer_id, ping.data);
|
||||
// send a ping response
|
||||
self.pong(peer_request_id, peer_id);
|
||||
}
|
||||
RPCRequest::MetaData(_) => {
|
||||
// send the requested meta-data
|
||||
self.send_meta_data_response((handler_id, id), peer_id);
|
||||
// TODO: inform the peer manager?
|
||||
}
|
||||
RPCRequest::Goodbye(reason) => {
|
||||
// let the peer manager know this peer is in the process of disconnecting
|
||||
self.peer_manager._disconnecting_peer(&peer_id);
|
||||
// queue for disconnection without a goodbye message
|
||||
debug!(self.log, "Received a Goodbye, queueing for disconnection";
|
||||
"peer_id" => peer_id.to_string());
|
||||
self.peers_to_dc.push(peer_id.clone());
|
||||
// TODO: do not propagate
|
||||
self.propagate_request(peer_request_id, peer_id, Request::Goodbye(reason));
|
||||
}
|
||||
/* Protocols propagated to the Network */
|
||||
RPCRequest::Status(msg) => {
|
||||
// inform the peer manager that we have received a status from a peer
|
||||
self.peer_manager.peer_statusd(&peer_id);
|
||||
// propagate the STATUS message upwards
|
||||
self.propagate_request(peer_request_id, peer_id, Request::Status(msg))
|
||||
}
|
||||
RPCRequest::BlocksByRange(req) => self.propagate_request(
|
||||
peer_request_id,
|
||||
peer_id,
|
||||
Request::BlocksByRange(req),
|
||||
),
|
||||
RPCRequest::BlocksByRoot(req) => {
|
||||
self.propagate_request(peer_request_id, peer_id, Request::BlocksByRoot(req))
|
||||
}
|
||||
}
|
||||
RPCRequest::MetaData(_) => {
|
||||
// send the requested meta-data
|
||||
self.send_meta_data_response(id, peer_id);
|
||||
// TODO: inform the peer manager?
|
||||
}
|
||||
/* Protocols propagated to the Network */
|
||||
RPCRequest::Status(msg) => {
|
||||
// inform the peer manager that we have received a status from a peer
|
||||
self.peer_manager.peer_statusd(&peer_id);
|
||||
// propagate the STATUS message upwards
|
||||
self.propagate_request(id, peer_id, Request::Status(msg))
|
||||
}
|
||||
RPCRequest::BlocksByRange(req) => {
|
||||
self.propagate_request(id, peer_id, Request::BlocksByRange(req))
|
||||
}
|
||||
RPCRequest::BlocksByRoot(req) => {
|
||||
self.propagate_request(id, peer_id, Request::BlocksByRoot(req))
|
||||
}
|
||||
RPCRequest::Goodbye(reason) => {
|
||||
// TODO: do not propagate
|
||||
self.propagate_request(id, peer_id, Request::Goodbye(reason));
|
||||
}
|
||||
},
|
||||
}
|
||||
Ok(RPCReceived::Response(id, resp)) => {
|
||||
match resp {
|
||||
/* Behaviour managed protocols */
|
||||
@ -734,10 +747,19 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
}
|
||||
|
||||
/// Consumes the events list when polled.
|
||||
fn custom_poll<TBehaviourIn>(
|
||||
fn custom_poll(
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
) -> Poll<NBAction<TBehaviourIn, BehaviourEvent<TSpec>>> {
|
||||
) -> Poll<NBAction<BehaviourHandlerIn<TSpec>, BehaviourEvent<TSpec>>> {
|
||||
// handle pending disconnections to perform
|
||||
if !self.peers_to_dc.is_empty() {
|
||||
return Poll::Ready(NBAction::NotifyHandler {
|
||||
peer_id: self.peers_to_dc.remove(0),
|
||||
handler: NotifyHandler::All,
|
||||
event: BehaviourHandlerIn::Shutdown(None),
|
||||
});
|
||||
}
|
||||
|
||||
// check the peer manager for events
|
||||
loop {
|
||||
match self.peer_manager.poll_next_unpin(cx) {
|
||||
@ -756,11 +778,20 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
PeerManagerEvent::MetaData(peer_id) => {
|
||||
self.send_meta_data_request(peer_id);
|
||||
}
|
||||
PeerManagerEvent::_DisconnectPeer(_peer_id) => {
|
||||
//TODO: Implement
|
||||
}
|
||||
PeerManagerEvent::_BanPeer(_peer_id) => {
|
||||
//TODO: Implement
|
||||
PeerManagerEvent::DisconnectPeer(peer_id) => {
|
||||
debug!(self.log, "PeerManager requested to disconnect a peer";
|
||||
"peer_id" => peer_id.to_string());
|
||||
// queue for disabling
|
||||
self.peers_to_dc.push(peer_id.clone());
|
||||
// send one goodbye
|
||||
return Poll::Ready(NBAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::Any,
|
||||
event: BehaviourHandlerIn::Shutdown(Some((
|
||||
RequestId::Behaviour,
|
||||
RPCRequest::Goodbye(GoodbyeReason::Fault),
|
||||
))),
|
||||
});
|
||||
}
|
||||
},
|
||||
Poll::Pending => break,
|
||||
@ -872,6 +903,9 @@ impl<TSpec: EthSpec> std::convert::From<Response<TSpec>> for RPCCodedResponse<TS
|
||||
}
|
||||
}
|
||||
|
||||
/// Identifier of requests sent by a peer.
|
||||
pub type PeerRequestId = (ConnectionId, SubstreamId);
|
||||
|
||||
/// The types of events than can be obtained from polling the behaviour.
|
||||
#[derive(Debug)]
|
||||
pub enum BehaviourEvent<TSpec: EthSpec> {
|
||||
@ -888,7 +922,7 @@ pub enum BehaviourEvent<TSpec: EthSpec> {
|
||||
/// The peer that sent the request.
|
||||
peer_id: PeerId,
|
||||
/// Identifier of the request. All responses to this request must use this id.
|
||||
id: SubstreamId,
|
||||
id: PeerRequestId,
|
||||
/// Request the peer sent.
|
||||
request: Request,
|
||||
},
|
||||
|
@ -15,7 +15,7 @@ mod service;
|
||||
pub mod types;
|
||||
|
||||
pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage};
|
||||
pub use behaviour::{BehaviourEvent, Request, Response};
|
||||
pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response};
|
||||
pub use config::Config as NetworkConfig;
|
||||
pub use discovery::enr_ext::{CombinedKeyExt, EnrExt};
|
||||
pub use libp2p::gossipsub::{MessageId, Topic, TopicHash};
|
||||
|
@ -96,9 +96,7 @@ pub enum PeerManagerEvent {
|
||||
/// Request METADATA from a peer.
|
||||
MetaData(PeerId),
|
||||
/// The peer should be disconnected.
|
||||
_DisconnectPeer(PeerId),
|
||||
/// The peer should be disconnected and banned.
|
||||
_BanPeer(PeerId),
|
||||
DisconnectPeer(PeerId),
|
||||
}
|
||||
|
||||
impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
@ -234,6 +232,12 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
self.connect_peer(peer_id, ConnectingType::Dialing)
|
||||
}
|
||||
|
||||
/// Updates the database informing that a peer is being disconnected.
|
||||
pub fn _disconnecting_peer(&mut self, _peer_id: &PeerId) -> bool {
|
||||
// TODO: implement
|
||||
true
|
||||
}
|
||||
|
||||
/// Reports a peer for some action.
|
||||
///
|
||||
/// If the peer doesn't exist, log a warning and insert defaults.
|
||||
@ -267,7 +271,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
// They closed early, this could mean poor connection
|
||||
PeerAction::MidToleranceError
|
||||
}
|
||||
RPCError::InternalError(_reason) => {
|
||||
RPCError::InternalError(_) | RPCError::HandlerRejected => {
|
||||
// Our fault. Do nothing
|
||||
return;
|
||||
}
|
||||
@ -444,7 +448,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
for id in ban_queue {
|
||||
pdb.ban(&id);
|
||||
|
||||
self.events.push(PeerManagerEvent::_BanPeer(id.clone()));
|
||||
self.events
|
||||
.push(PeerManagerEvent::DisconnectPeer(id.clone()));
|
||||
}
|
||||
|
||||
for id in unban_queue {
|
||||
|
@ -22,7 +22,7 @@ use std::{
|
||||
task::{Context, Poll},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::time::{delay_queue, DelayQueue};
|
||||
use tokio::time::{delay_queue, delay_until, Delay, DelayQueue, Instant as TInstant};
|
||||
use types::EthSpec;
|
||||
|
||||
//TODO: Implement check_timeout() on the substream types
|
||||
@ -33,6 +33,9 @@ pub const RESPONSE_TIMEOUT: u64 = 10;
|
||||
/// The number of times to retry an outbound upgrade in the case of IO errors.
|
||||
const IO_ERROR_RETRIES: u8 = 3;
|
||||
|
||||
/// Maximum time given to the handler to perform shutdown operations.
|
||||
const SHUTDOWN_TIMEOUT_SECS: u8 = 15;
|
||||
|
||||
/// Identifier of inbound and outbound substreams from the handler's perspective.
|
||||
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
|
||||
pub struct SubstreamId(usize);
|
||||
@ -116,6 +119,9 @@ where
|
||||
/// Value to return from `connection_keep_alive`.
|
||||
keep_alive: KeepAlive,
|
||||
|
||||
/// State of the handler.
|
||||
state: HandlerState,
|
||||
|
||||
/// After the given duration has elapsed, an inactive connection will shutdown.
|
||||
inactive_timeout: Duration,
|
||||
|
||||
@ -127,6 +133,18 @@ where
|
||||
log: slog::Logger,
|
||||
}
|
||||
|
||||
enum HandlerState {
|
||||
/// The handler is active. All messages are sent and received.
|
||||
Active,
|
||||
/// The handler is shutting_down.
|
||||
///
|
||||
/// While in this state the handler rejects new requests but tries to finish existing ones.
|
||||
ShuttingDown(Delay),
|
||||
/// The handler is deactivated. A goodbye has been sent and no more messages are sent or
|
||||
/// received.
|
||||
Deactivated,
|
||||
}
|
||||
|
||||
/// Contains the information the handler keeps on established outbound substreams.
|
||||
struct OutboundInfo<TSpec: EthSpec> {
|
||||
/// State of the substream.
|
||||
@ -278,6 +296,7 @@ where
|
||||
outbound_substreams_delay: DelayQueue::new(),
|
||||
current_inbound_substream_id: SubstreamId(0),
|
||||
current_outbound_substream_id: SubstreamId(0),
|
||||
state: HandlerState::Active,
|
||||
max_dial_negotiated: 8,
|
||||
keep_alive: KeepAlive::Yes,
|
||||
inactive_timeout,
|
||||
@ -302,10 +321,170 @@ where
|
||||
&mut self.listen_protocol
|
||||
}
|
||||
|
||||
/// Initiates the handler's shutdown process, sending an optional last message to the peer.
|
||||
pub fn shutdown(&mut self, final_msg: Option<(RequestId, RPCRequest<TSpec>)>) {
|
||||
if matches!(self.state, HandlerState::Active) {
|
||||
debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len());
|
||||
// we now drive to completion communications already dialed/established
|
||||
for (id, req) in self.dial_queue.pop() {
|
||||
self.pending_errors.push(HandlerErr::Outbound {
|
||||
id,
|
||||
proto: req.protocol(),
|
||||
error: RPCError::HandlerRejected,
|
||||
})
|
||||
}
|
||||
|
||||
// Queue our final message, if any
|
||||
if let Some((id, req)) = final_msg {
|
||||
self.dial_queue.push((id, req));
|
||||
}
|
||||
|
||||
self.state = HandlerState::ShuttingDown(delay_until(
|
||||
TInstant::now() + Duration::from_secs(SHUTDOWN_TIMEOUT_SECS as u64),
|
||||
));
|
||||
}
|
||||
self.update_keep_alive();
|
||||
}
|
||||
|
||||
/// Opens an outbound substream with a request.
|
||||
fn send_request(&mut self, id: RequestId, req: RPCRequest<TSpec>) {
|
||||
self.dial_queue.push((id, req));
|
||||
self.update_keep_alive();
|
||||
match self.state {
|
||||
HandlerState::Active => {
|
||||
self.dial_queue.push((id, req));
|
||||
self.update_keep_alive();
|
||||
}
|
||||
_ => {
|
||||
self.pending_errors.push(HandlerErr::Outbound {
|
||||
id,
|
||||
proto: req.protocol(),
|
||||
error: RPCError::HandlerRejected,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a response to a peer's request.
|
||||
// NOTE: If the substream has closed due to inactivity, or the substream is in the
|
||||
// wrong state a response will fail silently.
|
||||
fn send_response(&mut self, inbound_id: SubstreamId, response: RPCCodedResponse<TSpec>) {
|
||||
// Variables indicating if the response is an error response or a multi-part
|
||||
// response
|
||||
let res_is_error = response.is_error();
|
||||
let res_is_multiple = response.multiple_responses();
|
||||
|
||||
// check if the stream matching the response still exists
|
||||
let (substream_state, protocol) = match self.inbound_substreams.get_mut(&inbound_id) {
|
||||
Some((substream_state, _, protocol)) => (substream_state, protocol),
|
||||
None => {
|
||||
warn!(self.log, "Stream has expired. Response not sent";
|
||||
"response" => response.to_string(), "id" => inbound_id);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// If the response we are sending is an error, report back for handling
|
||||
match response {
|
||||
RPCCodedResponse::InvalidRequest(ref reason)
|
||||
| RPCCodedResponse::ServerError(ref reason)
|
||||
| RPCCodedResponse::Unknown(ref reason) => {
|
||||
let code = &response
|
||||
.error_code()
|
||||
.expect("Error response should map to an error code");
|
||||
let err = HandlerErr::Inbound {
|
||||
id: inbound_id,
|
||||
proto: *protocol,
|
||||
error: RPCError::ErrorResponse(*code, reason.to_string()),
|
||||
};
|
||||
self.pending_errors.push(err);
|
||||
}
|
||||
_ => {} // not an error, continue.
|
||||
}
|
||||
|
||||
if matches!(self.state, HandlerState::Deactivated) {
|
||||
// we no longer send responses after the handler is deactivated
|
||||
debug!(self.log, "Response not sent. Deactivated handler";
|
||||
"response" => response.to_string(), "id" => inbound_id);
|
||||
return;
|
||||
}
|
||||
|
||||
match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) {
|
||||
InboundSubstreamState::ResponseIdle(substream) => {
|
||||
// close the stream if there is no response
|
||||
if let RPCCodedResponse::StreamTermination(_) = response {
|
||||
*substream_state = InboundSubstreamState::Closing(substream);
|
||||
} else {
|
||||
// send the response
|
||||
// if it's a single rpc request or an error close the stream after.
|
||||
*substream_state = InboundSubstreamState::ResponsePendingSend {
|
||||
substream,
|
||||
message: response,
|
||||
closing: !res_is_multiple | res_is_error,
|
||||
}
|
||||
}
|
||||
}
|
||||
InboundSubstreamState::ResponsePendingSend {
|
||||
substream,
|
||||
message,
|
||||
closing,
|
||||
} if res_is_multiple => {
|
||||
// the stream is in use, add the request to a pending queue if active
|
||||
self.queued_outbound_items
|
||||
.entry(inbound_id)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(response);
|
||||
|
||||
// return the state
|
||||
*substream_state = InboundSubstreamState::ResponsePendingSend {
|
||||
substream,
|
||||
message,
|
||||
closing,
|
||||
};
|
||||
}
|
||||
InboundSubstreamState::ResponsePendingFlush { substream, closing }
|
||||
if res_is_multiple =>
|
||||
{
|
||||
// the stream is in use, add the request to a pending queue
|
||||
self.queued_outbound_items
|
||||
.entry(inbound_id)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(response);
|
||||
|
||||
// return the state
|
||||
*substream_state =
|
||||
InboundSubstreamState::ResponsePendingFlush { substream, closing };
|
||||
}
|
||||
InboundSubstreamState::Closing(substream) => {
|
||||
*substream_state = InboundSubstreamState::Closing(substream);
|
||||
debug!(self.log, "Response not sent. Stream is closing"; "response" => response.to_string());
|
||||
}
|
||||
InboundSubstreamState::ResponsePendingSend {
|
||||
substream, message, ..
|
||||
} => {
|
||||
*substream_state = InboundSubstreamState::ResponsePendingSend {
|
||||
substream,
|
||||
message,
|
||||
closing: true,
|
||||
};
|
||||
error!(
|
||||
self.log,
|
||||
"Attempted sending multiple responses to a single response request"
|
||||
);
|
||||
}
|
||||
InboundSubstreamState::ResponsePendingFlush { substream, .. } => {
|
||||
*substream_state = InboundSubstreamState::ResponsePendingFlush {
|
||||
substream,
|
||||
closing: true,
|
||||
};
|
||||
error!(
|
||||
self.log,
|
||||
"Attempted sending multiple responses to a single response request"
|
||||
);
|
||||
}
|
||||
InboundSubstreamState::Poisoned => {
|
||||
crit!(self.log, "Poisoned inbound substream");
|
||||
unreachable!("Coding error: Poisoned substream");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Updates the `KeepAlive` returned by `connection_keep_alive`.
|
||||
@ -316,15 +495,25 @@ where
|
||||
fn update_keep_alive(&mut self) {
|
||||
// Check that we don't have outbound items pending for dialing, nor dialing, nor
|
||||
// established. Also check that there are no established inbound substreams.
|
||||
// Errors and events need to be reported back, so check those too.
|
||||
let should_shutdown = self.dial_queue.is_empty()
|
||||
&& self.dial_negotiated == 0
|
||||
&& self.outbound_substreams.is_empty()
|
||||
&& self.inbound_substreams.is_empty();
|
||||
&& self.inbound_substreams.is_empty()
|
||||
&& self.pending_errors.is_empty()
|
||||
&& self.events_out.is_empty()
|
||||
&& self.dial_negotiated == 0;
|
||||
|
||||
if should_shutdown {
|
||||
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout)
|
||||
} else {
|
||||
self.keep_alive = KeepAlive::Yes
|
||||
match self.keep_alive {
|
||||
KeepAlive::Yes if should_shutdown => {
|
||||
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout);
|
||||
}
|
||||
KeepAlive::Yes => {} // We continue being active
|
||||
KeepAlive::Until(_) if should_shutdown => {} // Already deemed inactive
|
||||
KeepAlive::Until(_) => {
|
||||
// No longer idle
|
||||
self.keep_alive = KeepAlive::Yes;
|
||||
}
|
||||
KeepAlive::No => {} // currently not used
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -348,29 +537,32 @@ where
|
||||
&mut self,
|
||||
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
|
||||
) {
|
||||
let (req, substream) = substream;
|
||||
// drop the stream
|
||||
if let RPCRequest::Goodbye(_) = req {
|
||||
self.events_out
|
||||
.push(RPCReceived::Request(self.current_inbound_substream_id, req));
|
||||
self.current_inbound_substream_id.0 += 1;
|
||||
// only accept new peer requests when active
|
||||
if !matches!(self.state, HandlerState::Active) {
|
||||
return;
|
||||
}
|
||||
|
||||
// New inbound request. Store the stream and tag the output.
|
||||
let delay_key = self.inbound_substreams_delay.insert(
|
||||
self.current_inbound_substream_id,
|
||||
Duration::from_secs(RESPONSE_TIMEOUT),
|
||||
);
|
||||
let awaiting_stream = InboundSubstreamState::ResponseIdle(substream);
|
||||
self.inbound_substreams.insert(
|
||||
self.current_inbound_substream_id,
|
||||
(awaiting_stream, Some(delay_key), req.protocol()),
|
||||
);
|
||||
let (req, substream) = substream;
|
||||
|
||||
// store requests that expect responses
|
||||
if req.expected_responses() > 0 {
|
||||
// Store the stream and tag the output.
|
||||
let delay_key = self.inbound_substreams_delay.insert(
|
||||
self.current_inbound_substream_id,
|
||||
Duration::from_secs(RESPONSE_TIMEOUT),
|
||||
);
|
||||
let awaiting_stream = InboundSubstreamState::ResponseIdle(substream);
|
||||
self.inbound_substreams.insert(
|
||||
self.current_inbound_substream_id,
|
||||
(awaiting_stream, Some(delay_key), req.protocol()),
|
||||
);
|
||||
}
|
||||
|
||||
self.events_out
|
||||
.push(RPCReceived::Request(self.current_inbound_substream_id, req));
|
||||
self.current_inbound_substream_id.0 += 1;
|
||||
|
||||
self.update_keep_alive();
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
@ -379,9 +571,20 @@ where
|
||||
request_info: Self::OutboundOpenInfo,
|
||||
) {
|
||||
self.dial_negotiated -= 1;
|
||||
let (id, request) = request_info;
|
||||
let proto = request.protocol();
|
||||
|
||||
// accept outbound connections only if the handler is not deactivated
|
||||
if matches!(self.state, HandlerState::Deactivated) {
|
||||
self.pending_errors.push(HandlerErr::Outbound {
|
||||
id,
|
||||
proto,
|
||||
error: RPCError::HandlerRejected,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// add the stream to substreams if we expect a response, otherwise drop the stream.
|
||||
let (id, request) = request_info;
|
||||
let expected_responses = request.expected_responses();
|
||||
if expected_responses > 0 {
|
||||
// new outbound request. Store the stream and tag the output.
|
||||
@ -389,7 +592,6 @@ where
|
||||
self.current_outbound_substream_id,
|
||||
Duration::from_secs(RESPONSE_TIMEOUT),
|
||||
);
|
||||
let proto = request.protocol();
|
||||
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
|
||||
substream: out,
|
||||
request,
|
||||
@ -422,128 +624,10 @@ where
|
||||
self.update_keep_alive();
|
||||
}
|
||||
|
||||
// NOTE: If the substream has closed due to inactivity, or the substream is in the
|
||||
// wrong state a response will fail silently.
|
||||
fn inject_event(&mut self, rpc_event: Self::InEvent) {
|
||||
match rpc_event {
|
||||
RPCSend::Request(id, req) => self.send_request(id, req),
|
||||
RPCSend::Response(inbound_id, response) => {
|
||||
// Variables indicating if the response is an error response or a multi-part
|
||||
// response
|
||||
let res_is_error = response.is_error();
|
||||
let res_is_multiple = response.multiple_responses();
|
||||
|
||||
// check if the stream matching the response still exists
|
||||
let (substream_state, protocol) = match self.inbound_substreams.get_mut(&inbound_id)
|
||||
{
|
||||
Some((substream_state, _, protocol)) => (substream_state, protocol),
|
||||
None => {
|
||||
warn!(self.log, "Stream has expired. Response not sent";
|
||||
"response" => response.to_string(), "id" => inbound_id);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// If the response we are sending is an error, report back for handling
|
||||
match response {
|
||||
RPCCodedResponse::InvalidRequest(ref reason)
|
||||
| RPCCodedResponse::ServerError(ref reason)
|
||||
| RPCCodedResponse::Unknown(ref reason) => {
|
||||
let code = &response
|
||||
.error_code()
|
||||
.expect("Error response should map to an error code");
|
||||
let err = HandlerErr::Inbound {
|
||||
id: inbound_id,
|
||||
proto: *protocol,
|
||||
error: RPCError::ErrorResponse(*code, reason.to_string()),
|
||||
};
|
||||
self.pending_errors.push(err);
|
||||
}
|
||||
_ => {} // not an error, continue.
|
||||
}
|
||||
|
||||
match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) {
|
||||
InboundSubstreamState::ResponseIdle(substream) => {
|
||||
// close the stream if there is no response
|
||||
match response {
|
||||
RPCCodedResponse::StreamTermination(_) => {
|
||||
*substream_state = InboundSubstreamState::Closing(substream);
|
||||
}
|
||||
_ => {
|
||||
// send the response
|
||||
// if it's a single rpc request or an error, close the stream after
|
||||
*substream_state = InboundSubstreamState::ResponsePendingSend {
|
||||
substream,
|
||||
message: response,
|
||||
closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
InboundSubstreamState::ResponsePendingSend {
|
||||
substream,
|
||||
message,
|
||||
closing,
|
||||
} if res_is_multiple => {
|
||||
// the stream is in use, add the request to a pending queue
|
||||
self.queued_outbound_items
|
||||
.entry(inbound_id)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(response);
|
||||
|
||||
// return the state
|
||||
*substream_state = InboundSubstreamState::ResponsePendingSend {
|
||||
substream,
|
||||
message,
|
||||
closing,
|
||||
};
|
||||
}
|
||||
InboundSubstreamState::ResponsePendingFlush { substream, closing }
|
||||
if res_is_multiple =>
|
||||
{
|
||||
// the stream is in use, add the request to a pending queue
|
||||
self.queued_outbound_items
|
||||
.entry(inbound_id)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(response);
|
||||
|
||||
// return the state
|
||||
*substream_state =
|
||||
InboundSubstreamState::ResponsePendingFlush { substream, closing };
|
||||
}
|
||||
InboundSubstreamState::Closing(substream) => {
|
||||
*substream_state = InboundSubstreamState::Closing(substream);
|
||||
debug!(self.log, "Response not sent. Stream is closing"; "response" => format!("{}",response));
|
||||
}
|
||||
InboundSubstreamState::ResponsePendingSend {
|
||||
substream, message, ..
|
||||
} => {
|
||||
*substream_state = InboundSubstreamState::ResponsePendingSend {
|
||||
substream,
|
||||
message,
|
||||
closing: true,
|
||||
};
|
||||
error!(
|
||||
self.log,
|
||||
"Attempted sending multiple responses to a single response request"
|
||||
);
|
||||
}
|
||||
InboundSubstreamState::ResponsePendingFlush { substream, .. } => {
|
||||
*substream_state = InboundSubstreamState::ResponsePendingFlush {
|
||||
substream,
|
||||
closing: true,
|
||||
};
|
||||
error!(
|
||||
self.log,
|
||||
"Attempted sending multiple responses to a single response request"
|
||||
);
|
||||
}
|
||||
InboundSubstreamState::Poisoned => {
|
||||
crit!(self.log, "Poisoned inbound substream");
|
||||
unreachable!("Coding error: Poisoned substream");
|
||||
}
|
||||
}
|
||||
}
|
||||
RPCSend::Response(inbound_id, response) => self.send_response(inbound_id, response),
|
||||
}
|
||||
}
|
||||
|
||||
@ -563,6 +647,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// This dialing is now considered failed
|
||||
self.dial_negotiated -= 1;
|
||||
self.update_keep_alive();
|
||||
|
||||
self.outbound_io_error_retries = 0;
|
||||
// map the error
|
||||
let error = match error {
|
||||
@ -621,6 +709,21 @@ where
|
||||
self.events_out.shrink_to_fit();
|
||||
}
|
||||
|
||||
// Check if we are shutting down, and if the timer ran out
|
||||
if let HandlerState::ShuttingDown(delay) = &self.state {
|
||||
if delay.is_elapsed() {
|
||||
self.state = HandlerState::Deactivated;
|
||||
debug!(self.log, "Handler deactivated");
|
||||
// Drain queued responses
|
||||
for (inbound_id, queued_responses) in self.queued_outbound_items.drain() {
|
||||
for response in queued_responses {
|
||||
debug!(self.log, "Response not sent. Deactivated handler";
|
||||
"response" => response.to_string(), "id" => inbound_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// purge expired inbound substreams and send an error
|
||||
loop {
|
||||
match self.inbound_substreams_delay.poll_next_unpin(cx) {
|
||||
@ -686,6 +789,9 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// when deactivated, close all streams
|
||||
let deactivated = matches!(self.state, HandlerState::Deactivated);
|
||||
|
||||
// drive inbound streams that need to be processed
|
||||
for request_id in self.inbound_substreams.keys().copied().collect::<Vec<_>>() {
|
||||
// Drain all queued items until all messages have been processed for this stream
|
||||
@ -704,57 +810,78 @@ where
|
||||
message,
|
||||
closing,
|
||||
} => {
|
||||
match Sink::poll_ready(Pin::new(&mut substream), cx) {
|
||||
Poll::Ready(Ok(())) => {
|
||||
// stream is ready to send data
|
||||
match Sink::start_send(Pin::new(&mut substream), message) {
|
||||
Ok(()) => {
|
||||
// await flush
|
||||
entry.get_mut().0 =
|
||||
if deactivated {
|
||||
if !closing {
|
||||
// inform back to cancel this request's processing
|
||||
self.pending_errors.push(HandlerErr::Inbound {
|
||||
id: request_id,
|
||||
proto: entry.get().2,
|
||||
error: RPCError::HandlerRejected,
|
||||
});
|
||||
}
|
||||
entry.get_mut().0 = InboundSubstreamState::Closing(substream);
|
||||
drive_stream_further = true;
|
||||
} else {
|
||||
match Sink::poll_ready(Pin::new(&mut substream), cx) {
|
||||
Poll::Ready(Ok(())) => {
|
||||
// stream is ready to send data
|
||||
match Sink::start_send(
|
||||
Pin::new(&mut substream),
|
||||
message,
|
||||
) {
|
||||
Ok(()) => {
|
||||
// await flush
|
||||
entry.get_mut().0 =
|
||||
InboundSubstreamState::ResponsePendingFlush {
|
||||
substream,
|
||||
closing,
|
||||
};
|
||||
drive_stream_further = true;
|
||||
}
|
||||
Err(e) => {
|
||||
// error with sending in the codec
|
||||
warn!(self.log, "Error sending RPC message"; "error" => e.to_string());
|
||||
// keep connection with the peer and return the
|
||||
// stream to awaiting response if this message
|
||||
// wasn't closing the stream
|
||||
// TODO: Duplicate code
|
||||
if closing {
|
||||
entry.get_mut().0 =
|
||||
InboundSubstreamState::Closing(substream);
|
||||
drive_stream_further = true;
|
||||
} else {
|
||||
// check for queued chunks and update the stream
|
||||
entry.get_mut().0 = apply_queued_responses(
|
||||
substream,
|
||||
&mut self
|
||||
.queued_outbound_items
|
||||
.get_mut(&request_id),
|
||||
&mut drive_stream_further,
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
// error with sending in the codec
|
||||
warn!(self.log, "Error sending RPC message"; "error" => e.to_string());
|
||||
// keep connection with the peer and return the
|
||||
// stream to awaiting response if this message
|
||||
// wasn't closing the stream
|
||||
if closing {
|
||||
entry.get_mut().0 =
|
||||
InboundSubstreamState::Closing(
|
||||
substream,
|
||||
);
|
||||
drive_stream_further = true;
|
||||
} else {
|
||||
// check for queued chunks and update the stream
|
||||
entry.get_mut().0 = apply_queued_responses(
|
||||
substream,
|
||||
&mut self
|
||||
.queued_outbound_items
|
||||
.get_mut(&request_id),
|
||||
&mut drive_stream_further,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
error!(self.log, "Outbound substream error while sending RPC message: {:?}", e);
|
||||
entry.remove();
|
||||
self.update_keep_alive();
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(e));
|
||||
}
|
||||
Poll::Pending => {
|
||||
// the stream is not yet ready, continue waiting
|
||||
entry.get_mut().0 =
|
||||
InboundSubstreamState::ResponsePendingSend {
|
||||
substream,
|
||||
message,
|
||||
closing,
|
||||
};
|
||||
Poll::Ready(Err(e)) => {
|
||||
error!(
|
||||
self.log,
|
||||
"Outbound substream error while sending RPC message: {:?}",
|
||||
e
|
||||
);
|
||||
entry.remove();
|
||||
self.update_keep_alive();
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(e));
|
||||
}
|
||||
Poll::Pending => {
|
||||
// the stream is not yet ready, continue waiting
|
||||
entry.get_mut().0 =
|
||||
InboundSubstreamState::ResponsePendingSend {
|
||||
substream,
|
||||
message,
|
||||
closing,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -766,7 +893,15 @@ where
|
||||
Poll::Ready(Ok(())) => {
|
||||
// finished flushing
|
||||
// TODO: Duplicate code
|
||||
if closing {
|
||||
if closing | deactivated {
|
||||
if !closing {
|
||||
// inform back to cancel this request's processing
|
||||
self.pending_errors.push(HandlerErr::Inbound {
|
||||
id: request_id,
|
||||
proto: entry.get().2,
|
||||
error: RPCError::HandlerRejected,
|
||||
});
|
||||
}
|
||||
entry.get_mut().0 =
|
||||
InboundSubstreamState::Closing(substream);
|
||||
drive_stream_further = true;
|
||||
@ -805,11 +940,16 @@ where
|
||||
}
|
||||
}
|
||||
InboundSubstreamState::ResponseIdle(substream) => {
|
||||
entry.get_mut().0 = apply_queued_responses(
|
||||
substream,
|
||||
&mut self.queued_outbound_items.get_mut(&request_id),
|
||||
&mut drive_stream_further,
|
||||
);
|
||||
if !deactivated {
|
||||
entry.get_mut().0 = apply_queued_responses(
|
||||
substream,
|
||||
&mut self.queued_outbound_items.get_mut(&request_id),
|
||||
&mut drive_stream_further,
|
||||
);
|
||||
} else {
|
||||
entry.get_mut().0 = InboundSubstreamState::Closing(substream);
|
||||
drive_stream_further = true;
|
||||
}
|
||||
}
|
||||
InboundSubstreamState::Closing(mut substream) => {
|
||||
match Sink::poll_close(Pin::new(&mut substream), cx) {
|
||||
@ -866,6 +1006,18 @@ where
|
||||
};
|
||||
|
||||
match state {
|
||||
OutboundSubstreamState::RequestPendingResponse {
|
||||
substream,
|
||||
request: _,
|
||||
} if deactivated => {
|
||||
// the handler is deactivated. Close the stream
|
||||
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
|
||||
self.pending_errors.push(HandlerErr::Outbound {
|
||||
id: entry.get().req_id,
|
||||
proto: entry.get().proto,
|
||||
error: RPCError::HandlerRejected,
|
||||
})
|
||||
}
|
||||
OutboundSubstreamState::RequestPendingResponse {
|
||||
mut substream,
|
||||
request,
|
||||
|
@ -80,6 +80,8 @@ impl<T: EthSpec> std::fmt::Display for RPCSend<T> {
|
||||
pub struct RPCMessage<TSpec: EthSpec> {
|
||||
/// The peer that sent the message.
|
||||
pub peer_id: PeerId,
|
||||
/// Handler managing this message.
|
||||
pub conn_id: ConnectionId,
|
||||
/// The message that was sent.
|
||||
pub event: <RPCHandler<TSpec> as ProtocolsHandler>::OutEvent,
|
||||
}
|
||||
@ -102,14 +104,35 @@ impl<TSpec: EthSpec> RPC<TSpec> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends an RPC response.
|
||||
///
|
||||
/// The peer must be connected for this to succeed.
|
||||
pub fn send_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
id: (ConnectionId, SubstreamId),
|
||||
event: RPCCodedResponse<TSpec>,
|
||||
) {
|
||||
self.events.push(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::One(id.0),
|
||||
event: RPCSend::Response(id.1, event),
|
||||
});
|
||||
}
|
||||
|
||||
/// Submits an RPC request.
|
||||
///
|
||||
/// The peer must be connected for this to succeed.
|
||||
pub fn send_rpc(&mut self, peer_id: PeerId, event: RPCSend<TSpec>) {
|
||||
pub fn send_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
event: RPCRequest<TSpec>,
|
||||
) {
|
||||
self.events.push(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::Any,
|
||||
event,
|
||||
event: RPCSend::Request(request_id, event),
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -126,7 +149,7 @@ where
|
||||
SubstreamProtocol::new(RPCProtocol {
|
||||
phantom: PhantomData,
|
||||
}),
|
||||
Duration::from_secs(5),
|
||||
Duration::from_secs(30),
|
||||
&self.log,
|
||||
)
|
||||
}
|
||||
@ -169,13 +192,14 @@ where
|
||||
fn inject_event(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
_: ConnectionId,
|
||||
conn_id: ConnectionId,
|
||||
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
|
||||
) {
|
||||
// send the event to the user
|
||||
self.events
|
||||
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
conn_id,
|
||||
event,
|
||||
}));
|
||||
}
|
||||
|
@ -422,8 +422,10 @@ pub enum RPCError {
|
||||
InvalidData,
|
||||
/// An error occurred due to internal reasons. Ex: timer failure.
|
||||
InternalError(&'static str),
|
||||
/// Negotiation with this peer timed out
|
||||
/// Negotiation with this peer timed out.
|
||||
NegotiationTimeout,
|
||||
/// Handler rejected this request.
|
||||
HandlerRejected,
|
||||
}
|
||||
|
||||
impl From<ssz::DecodeError> for RPCError {
|
||||
@ -461,6 +463,7 @@ impl std::fmt::Display for RPCError {
|
||||
RPCError::IncompleteStream => write!(f, "Stream ended unexpectedly"),
|
||||
RPCError::InternalError(ref err) => write!(f, "Internal error: {}", err),
|
||||
RPCError::NegotiationTimeout => write!(f, "Negotiation timeout"),
|
||||
RPCError::HandlerRejected => write!(f, "Handler rejected the request"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -478,6 +481,7 @@ impl std::error::Error for RPCError {
|
||||
RPCError::InternalError(_) => None,
|
||||
RPCError::ErrorResponse(_, _) => None,
|
||||
RPCError::NegotiationTimeout => None,
|
||||
RPCError::HandlerRejected => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
use crate::behaviour::{Behaviour, BehaviourEvent, Request, Response};
|
||||
use crate::behaviour::{Behaviour, BehaviourEvent, PeerRequestId, Request, Response};
|
||||
use crate::discovery::enr;
|
||||
use crate::multiaddr::Protocol;
|
||||
use crate::rpc::{RPCResponseErrorCode, RequestId, SubstreamId};
|
||||
use crate::rpc::{RPCResponseErrorCode, RequestId};
|
||||
use crate::types::{error, GossipKind};
|
||||
use crate::EnrExt;
|
||||
use crate::{NetworkConfig, NetworkGlobals};
|
||||
@ -235,23 +235,16 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
||||
pub fn respond_with_error(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
stream_id: SubstreamId,
|
||||
id: PeerRequestId,
|
||||
error: RPCResponseErrorCode,
|
||||
reason: String,
|
||||
) {
|
||||
self.swarm
|
||||
._send_error_reponse(peer_id, stream_id, error, reason);
|
||||
self.swarm._send_error_reponse(peer_id, id, error, reason);
|
||||
}
|
||||
|
||||
/// Sends a response to a peer's request.
|
||||
pub fn send_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
stream_id: SubstreamId,
|
||||
response: Response<TSpec>,
|
||||
) {
|
||||
self.swarm
|
||||
.send_successful_response(peer_id, stream_id, response);
|
||||
pub fn send_response(&mut self, peer_id: PeerId, id: PeerRequestId, response: Response<TSpec>) {
|
||||
self.swarm.send_successful_response(peer_id, id, response);
|
||||
}
|
||||
|
||||
pub async fn next_event(&mut self) -> Libp2pEvent<TSpec> {
|
||||
@ -382,7 +375,7 @@ fn build_transport(
|
||||
#[cfg(feature = "libp2p-websocket")]
|
||||
let transport = {
|
||||
let trans_clone = transport.clone();
|
||||
transport.or_transport(websocket::WsConfig::new(trans_clone))
|
||||
transport.or_transport(libp2p::websocket::WsConfig::new(trans_clone))
|
||||
};
|
||||
// Authentication
|
||||
Ok(transport
|
||||
|
@ -10,8 +10,8 @@ use crate::error;
|
||||
use crate::service::NetworkMessage;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
|
||||
use eth2_libp2p::{
|
||||
rpc::{RPCError, RequestId, SubstreamId},
|
||||
MessageId, NetworkGlobals, PeerId, PubsubMessage, Request, Response,
|
||||
rpc::{RPCError, RequestId},
|
||||
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
|
||||
};
|
||||
use futures::prelude::*;
|
||||
use processor::Processor;
|
||||
@ -46,7 +46,7 @@ pub enum RouterMessage<T: EthSpec> {
|
||||
/// An RPC request has been received.
|
||||
RPCRequestReceived {
|
||||
peer_id: PeerId,
|
||||
stream_id: SubstreamId,
|
||||
id: PeerRequestId,
|
||||
request: Request,
|
||||
},
|
||||
/// An RPC response has been received.
|
||||
@ -127,10 +127,10 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
}
|
||||
RouterMessage::RPCRequestReceived {
|
||||
peer_id,
|
||||
stream_id,
|
||||
id,
|
||||
request,
|
||||
} => {
|
||||
self.handle_rpc_request(peer_id, stream_id, request);
|
||||
self.handle_rpc_request(peer_id, id, request);
|
||||
}
|
||||
RouterMessage::RPCResponseReceived {
|
||||
peer_id,
|
||||
@ -160,11 +160,11 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
/* RPC - Related functionality */
|
||||
|
||||
/// A new RPC request has been received from the network.
|
||||
fn handle_rpc_request(&mut self, peer_id: PeerId, stream_id: SubstreamId, request: Request) {
|
||||
fn handle_rpc_request(&mut self, peer_id: PeerId, id: PeerRequestId, request: Request) {
|
||||
match request {
|
||||
Request::Status(status_message) => {
|
||||
self.processor
|
||||
.on_status_request(peer_id, stream_id, status_message)
|
||||
.on_status_request(peer_id, id, status_message)
|
||||
}
|
||||
Request::Goodbye(goodbye_reason) => {
|
||||
debug!(
|
||||
@ -177,10 +177,10 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
}
|
||||
Request::BlocksByRange(request) => self
|
||||
.processor
|
||||
.on_blocks_by_range_request(peer_id, stream_id, request),
|
||||
.on_blocks_by_range_request(peer_id, id, request),
|
||||
Request::BlocksByRoot(request) => self
|
||||
.processor
|
||||
.on_blocks_by_root_request(peer_id, stream_id, request),
|
||||
.on_blocks_by_root_request(peer_id, id, request),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -10,7 +10,7 @@ use beacon_chain::{
|
||||
ForkChoiceError, GossipVerifiedBlock,
|
||||
};
|
||||
use eth2_libp2p::rpc::*;
|
||||
use eth2_libp2p::{NetworkGlobals, PeerId, Request, Response};
|
||||
use eth2_libp2p::{NetworkGlobals, PeerId, PeerRequestId, Request, Response};
|
||||
use itertools::process_results;
|
||||
use slog::{debug, error, o, trace, warn};
|
||||
use ssz::Encode;
|
||||
@ -121,7 +121,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
pub fn on_status_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: SubstreamId,
|
||||
request_id: PeerRequestId,
|
||||
status: StatusMessage,
|
||||
) {
|
||||
debug!(
|
||||
@ -286,7 +286,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
pub fn on_blocks_by_root_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: SubstreamId,
|
||||
request_id: PeerRequestId,
|
||||
request: BlocksByRootRequest,
|
||||
) {
|
||||
let mut send_block_count = 0;
|
||||
@ -324,7 +324,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
pub fn on_blocks_by_range_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: SubstreamId,
|
||||
request_id: PeerRequestId,
|
||||
mut req: BlocksByRangeRequest,
|
||||
) {
|
||||
debug!(
|
||||
@ -1125,29 +1125,24 @@ impl<T: EthSpec> HandlerNetworkContext<T> {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn send_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
response: Response<T>,
|
||||
stream_id: SubstreamId,
|
||||
) {
|
||||
pub fn send_response(&mut self, peer_id: PeerId, response: Response<T>, id: PeerRequestId) {
|
||||
self.inform_network(NetworkMessage::SendResponse {
|
||||
peer_id,
|
||||
stream_id,
|
||||
id,
|
||||
response,
|
||||
})
|
||||
}
|
||||
pub fn _send_error_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
substream_id: SubstreamId,
|
||||
id: PeerRequestId,
|
||||
error: RPCResponseErrorCode,
|
||||
reason: String,
|
||||
) {
|
||||
self.inform_network(NetworkMessage::SendError {
|
||||
peer_id,
|
||||
error,
|
||||
substream_id,
|
||||
id,
|
||||
reason,
|
||||
})
|
||||
}
|
||||
|
@ -8,8 +8,8 @@ use crate::{error, metrics};
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use eth2_libp2p::Service as LibP2PService;
|
||||
use eth2_libp2p::{
|
||||
rpc::{RPCResponseErrorCode, RequestId, SubstreamId},
|
||||
Libp2pEvent, PubsubMessage, Request, Response,
|
||||
rpc::{RPCResponseErrorCode, RequestId},
|
||||
Libp2pEvent, PeerRequestId, PubsubMessage, Request, Response,
|
||||
};
|
||||
use eth2_libp2p::{BehaviourEvent, Enr, MessageId, NetworkGlobals, PeerId};
|
||||
use futures::prelude::*;
|
||||
@ -164,11 +164,11 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
NetworkMessage::SendRequest{ peer_id, request, request_id } => {
|
||||
service.libp2p.send_request(peer_id, request_id, request);
|
||||
}
|
||||
NetworkMessage::SendResponse{ peer_id, response, stream_id } => {
|
||||
service.libp2p.send_response(peer_id, stream_id, response);
|
||||
NetworkMessage::SendResponse{ peer_id, response, id } => {
|
||||
service.libp2p.send_response(peer_id, id, response);
|
||||
}
|
||||
NetworkMessage::SendError{ peer_id, error, substream_id, reason } => {
|
||||
service.libp2p.respond_with_error(peer_id, substream_id, error, reason);
|
||||
NetworkMessage::SendError{ peer_id, error, id, reason } => {
|
||||
service.libp2p.respond_with_error(peer_id, id, error, reason);
|
||||
}
|
||||
NetworkMessage::Propagate {
|
||||
propagation_source,
|
||||
@ -281,7 +281,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
};
|
||||
let _ = service
|
||||
.router_send
|
||||
.send(RouterMessage::RPCRequestReceived{peer_id, stream_id:id, request})
|
||||
.send(RouterMessage::RPCRequestReceived{peer_id, id, request})
|
||||
.map_err(|_| {
|
||||
debug!(service.log, "Failed to send RPC to router");
|
||||
});
|
||||
@ -289,7 +289,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
BehaviourEvent::ResponseReceived{peer_id, id, response} => {
|
||||
let _ = service
|
||||
.router_send
|
||||
.send(RouterMessage::RPCResponseReceived{ peer_id, request_id:id, response })
|
||||
.send(RouterMessage::RPCResponseReceived{ peer_id, request_id: id, response })
|
||||
.map_err(|_| {
|
||||
debug!(service.log, "Failed to send RPC to router");
|
||||
});
|
||||
@ -298,7 +298,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
BehaviourEvent::RPCFailed{id, peer_id, error} => {
|
||||
let _ = service
|
||||
.router_send
|
||||
.send(RouterMessage::RPCFailed{ peer_id, request_id:id, error })
|
||||
.send(RouterMessage::RPCFailed{ peer_id, request_id: id, error })
|
||||
.map_err(|_| {
|
||||
debug!(service.log, "Failed to send RPC to router");
|
||||
});
|
||||
@ -424,7 +424,7 @@ pub enum NetworkMessage<T: EthSpec> {
|
||||
SendResponse {
|
||||
peer_id: PeerId,
|
||||
response: Response<T>,
|
||||
stream_id: SubstreamId,
|
||||
id: PeerRequestId,
|
||||
},
|
||||
/// Respond to a peer's request with an error.
|
||||
SendError {
|
||||
@ -433,7 +433,7 @@ pub enum NetworkMessage<T: EthSpec> {
|
||||
peer_id: PeerId,
|
||||
error: RPCResponseErrorCode,
|
||||
reason: String,
|
||||
substream_id: SubstreamId,
|
||||
id: PeerRequestId,
|
||||
},
|
||||
/// Publish a list of messages to the gossipsub protocol.
|
||||
Publish { messages: Vec<PubsubMessage<T>> },
|
||||
|
@ -46,3 +46,6 @@ assert_matches = "1.3.0"
|
||||
remote_beacon_node = { path = "../../common/remote_beacon_node" }
|
||||
node_test_rig = { path = "../../testing/node_test_rig" }
|
||||
tree_hash = "0.1.0"
|
||||
|
||||
[features]
|
||||
fake_crypto = []
|
||||
|
Loading…
Reference in New Issue
Block a user