diff --git a/beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs b/beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs index 2ab10ee93..8fb31feef 100644 --- a/beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs +++ b/beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs @@ -48,45 +48,20 @@ impl DelegatingHandler { } } - /// Gives access to the gossipsub handler. - pub fn _gossip_mut(&mut self) -> &mut GossipHandler { - &mut self.gossip_handler - } - /// Gives mutable access to the rpc handler. - pub fn _rpc_mut(&mut self) -> &mut RPCHandler { + pub fn rpc_mut(&mut self) -> &mut RPCHandler { &mut self.rpc_handler } - /// Gives mutable access to identify's handler. - pub fn _identify_mut(&mut self) -> &mut IdentifyHandler { - &mut self.identify_handler - } - - /// Gives mutable access to discovery's handler. - pub fn _discovery_mut(&mut self) -> &mut DiscoveryHandler { - &mut self.discovery_handler - } - - /// Gives access to the gossipsub handler. - pub fn _gossip(&self) -> &GossipHandler { - &self.gossip_handler - } - /// Gives access to the rpc handler. - pub fn _rpc(&self) -> &RPCHandler { + pub fn rpc(&self) -> &RPCHandler { &self.rpc_handler } /// Gives access to identify's handler. - pub fn _identify(&self) -> &IdentifyHandler { + pub fn identify(&self) -> &IdentifyHandler { &self.identify_handler } - - /// Gives access to discovery's handler. - pub fn _discovery(&self) -> &DiscoveryHandler { - &self.discovery_handler - } } // TODO: this can all be created with macros diff --git a/beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs b/beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs index 772a428fa..c88813121 100644 --- a/beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs +++ b/beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs @@ -22,8 +22,8 @@ mod delegate; pub struct BehaviourHandler { /// Handler combining all sub behaviour's handlers. delegate: DelegatingHandler, - /// Keep alive for this handler. - keep_alive: KeepAlive, + /// Flag indicating if the handler is shutting down. + shutting_down: bool, } impl BehaviourHandler { @@ -35,7 +35,7 @@ impl BehaviourHandler { ) -> Self { BehaviourHandler { delegate: DelegatingHandler::new(gossipsub, rpc, identify, discovery), - keep_alive: KeepAlive::Yes, + shutting_down: false, } } } @@ -43,8 +43,8 @@ impl BehaviourHandler { #[derive(Clone)] pub enum BehaviourHandlerIn { Delegate(DelegateIn), - // TODO: replace custom with incoming events - Custom, + /// Start the shutdown process. + Shutdown(Option<(RequestId, RPCRequest)>), } pub enum BehaviourHandlerOut { @@ -84,8 +84,9 @@ impl ProtocolsHandler for BehaviourHandler { match event { BehaviourHandlerIn::Delegate(delegated_ev) => self.delegate.inject_event(delegated_ev), /* Events comming from the behaviour */ - BehaviourHandlerIn::Custom => { - // TODO: implement + BehaviourHandlerIn::Shutdown(last_message) => { + self.shutting_down = true; + self.delegate.rpc_mut().shutdown(last_message); } } } @@ -101,8 +102,13 @@ impl ProtocolsHandler for BehaviourHandler { } fn connection_keep_alive(&self) -> KeepAlive { - // TODO: refine this logic - self.keep_alive.min(self.delegate.connection_keep_alive()) + if self.shutting_down { + let rpc_keep_alive = self.delegate.rpc().connection_keep_alive(); + let identify_keep_alive = self.delegate.identify().connection_keep_alive(); + rpc_keep_alive.max(identify_keep_alive) + } else { + KeepAlive::Yes + } } fn poll( @@ -135,7 +141,5 @@ impl ProtocolsHandler for BehaviourHandler { } Poll::Pending - - // TODO: speak to our behaviour here } } diff --git a/beacon_node/eth2-libp2p/src/behaviour/mod.rs b/beacon_node/eth2-libp2p/src/behaviour/mod.rs index aadc9cbc2..d9534e2cc 100644 --- a/beacon_node/eth2-libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2-libp2p/src/behaviour/mod.rs @@ -15,7 +15,8 @@ use libp2p::{ gossipsub::{Gossipsub, GossipsubEvent, MessageId}, identify::{Identify, IdentifyEvent}, swarm::{ - NetworkBehaviour, NetworkBehaviourAction as NBAction, PollParameters, ProtocolsHandler, + NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler, PollParameters, + ProtocolsHandler, }, PeerId, }; @@ -51,7 +52,8 @@ pub struct Behaviour { peer_manager: PeerManager, /// The events generated by this behaviour to be consumed in the swarm poll. events: Vec>, - // TODO: add events to send to the handler + /// Queue of peers to disconnect. + peers_to_dc: Vec, /// The current meta data of the node, so respond to pings and get metadata meta_data: MetaData, /// A cache of recently seen gossip messages. This is used to filter out any possible @@ -285,6 +287,7 @@ impl Behaviour { identify, peer_manager: PeerManager::new(network_globals.clone(), log), events: Vec::new(), + peers_to_dc: Vec::new(), seen_gossip_messages: LruCache::new(100_000), meta_data, network_globals, @@ -396,36 +399,34 @@ impl Behaviour { /// Send a request to a peer over RPC. pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) { - self.send_rpc(peer_id, RPCSend::Request(request_id, request.into())) + self.eth2_rpc + .send_request(peer_id, request_id, request.into()) } /// Send a successful response to a peer over RPC. pub fn send_successful_response( &mut self, peer_id: PeerId, - stream_id: SubstreamId, + id: PeerRequestId, response: Response, ) { - self.send_rpc(peer_id, RPCSend::Response(stream_id, response.into())) + self.eth2_rpc.send_response(peer_id, id, response.into()) } /// Inform the peer that their request produced an error. pub fn _send_error_reponse( &mut self, peer_id: PeerId, - stream_id: SubstreamId, + id: PeerRequestId, error: RPCResponseErrorCode, reason: String, ) { - self.send_rpc( + self.eth2_rpc.send_response( peer_id, - RPCSend::Response(stream_id, RPCCodedResponse::from_error_code(error, reason)), + id, + RPCCodedResponse::from_error_code(error, reason), ) } - /// Sends an RPC Request/Response via the RPC protocol. - fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCSend) { - self.eth2_rpc.send_rpc(peer_id, rpc_event); - } /* Discovery / Peer management functions */ @@ -512,36 +513,32 @@ impl Behaviour { data: self.meta_data.seq_number, }; debug!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => peer_id.to_string()); - let event = RPCSend::Request(id, RPCRequest::Ping(ping)); - self.send_rpc(peer_id, event); + self.eth2_rpc + .send_request(peer_id, id, RPCRequest::Ping(ping)); } /// Sends a Pong response to the peer. - fn pong(&mut self, id: SubstreamId, peer_id: PeerId) { + fn pong(&mut self, id: PeerRequestId, peer_id: PeerId) { let ping = crate::rpc::Ping { data: self.meta_data.seq_number, }; - debug!(self.log, "Sending Pong"; "request_id" => id, "peer_id" => peer_id.to_string()); - let event = RPCSend::Response(id, RPCCodedResponse::Success(RPCResponse::Pong(ping))); - - self.send_rpc(peer_id, event); + debug!(self.log, "Sending Pong"; "request_id" => id.1, "peer_id" => peer_id.to_string()); + let event = RPCCodedResponse::Success(RPCResponse::Pong(ping)); + self.eth2_rpc.send_response(peer_id, id, event); } /// Sends a METADATA request to a peer. fn send_meta_data_request(&mut self, peer_id: PeerId) { - let metadata_request = - RPCSend::Request(RequestId::Behaviour, RPCRequest::MetaData(PhantomData)); - self.send_rpc(peer_id, metadata_request); + let event = RPCRequest::MetaData(PhantomData); + self.eth2_rpc + .send_request(peer_id, RequestId::Behaviour, event); } /// Sends a METADATA response to a peer. - fn send_meta_data_response(&mut self, id: SubstreamId, peer_id: PeerId) { - let metadata_response = RPCSend::Response( - id, - RPCCodedResponse::Success(RPCResponse::MetaData(self.meta_data.clone())), - ); - self.send_rpc(peer_id, metadata_response); + fn send_meta_data_response(&mut self, id: PeerRequestId, peer_id: PeerId) { + let event = RPCCodedResponse::Success(RPCResponse::MetaData(self.meta_data.clone())); + self.eth2_rpc.send_response(peer_id, id, event); } /// Returns a reference to the peer manager to allow the swarm to notify the manager of peer @@ -635,7 +632,7 @@ impl Behaviour { } /// Convenience function to propagate a request. - fn propagate_request(&mut self, id: SubstreamId, peer_id: PeerId, request: Request) { + fn propagate_request(&mut self, id: PeerRequestId, peer_id: PeerId, request: Request) { self.events.push(BehaviourEvent::RequestReceived { peer_id, id, @@ -645,6 +642,7 @@ impl Behaviour { fn on_rpc_event(&mut self, message: RPCMessage) { let peer_id = message.peer_id; + let handler_id = message.conn_id; // The METADATA and PING RPC responses are handled within the behaviour and not propagated match message.event { Err(handler_err) => { @@ -654,6 +652,10 @@ impl Behaviour { proto, error, } => { + if matches!(error, RPCError::HandlerRejected) { + // this peer's request got canceled + // TODO: cancel processing for this request + } // Inform the peer manager of the error. // An inbound error here means we sent an error to the peer, or the stream // timed out. @@ -670,37 +672,48 @@ impl Behaviour { } } } - Ok(RPCReceived::Request(id, request)) => match request { - /* Behaviour managed protocols: Ping and Metadata */ - RPCRequest::Ping(ping) => { - // inform the peer manager and send the response - self.peer_manager.ping_request(&peer_id, ping.data); - // send a ping response - self.pong(id, peer_id); + Ok(RPCReceived::Request(id, request)) => { + let peer_request_id = (handler_id, id); + match request { + /* Behaviour managed protocols: Ping and Metadata */ + RPCRequest::Ping(ping) => { + // inform the peer manager and send the response + self.peer_manager.ping_request(&peer_id, ping.data); + // send a ping response + self.pong(peer_request_id, peer_id); + } + RPCRequest::MetaData(_) => { + // send the requested meta-data + self.send_meta_data_response((handler_id, id), peer_id); + // TODO: inform the peer manager? + } + RPCRequest::Goodbye(reason) => { + // let the peer manager know this peer is in the process of disconnecting + self.peer_manager._disconnecting_peer(&peer_id); + // queue for disconnection without a goodbye message + debug!(self.log, "Received a Goodbye, queueing for disconnection"; + "peer_id" => peer_id.to_string()); + self.peers_to_dc.push(peer_id.clone()); + // TODO: do not propagate + self.propagate_request(peer_request_id, peer_id, Request::Goodbye(reason)); + } + /* Protocols propagated to the Network */ + RPCRequest::Status(msg) => { + // inform the peer manager that we have received a status from a peer + self.peer_manager.peer_statusd(&peer_id); + // propagate the STATUS message upwards + self.propagate_request(peer_request_id, peer_id, Request::Status(msg)) + } + RPCRequest::BlocksByRange(req) => self.propagate_request( + peer_request_id, + peer_id, + Request::BlocksByRange(req), + ), + RPCRequest::BlocksByRoot(req) => { + self.propagate_request(peer_request_id, peer_id, Request::BlocksByRoot(req)) + } } - RPCRequest::MetaData(_) => { - // send the requested meta-data - self.send_meta_data_response(id, peer_id); - // TODO: inform the peer manager? - } - /* Protocols propagated to the Network */ - RPCRequest::Status(msg) => { - // inform the peer manager that we have received a status from a peer - self.peer_manager.peer_statusd(&peer_id); - // propagate the STATUS message upwards - self.propagate_request(id, peer_id, Request::Status(msg)) - } - RPCRequest::BlocksByRange(req) => { - self.propagate_request(id, peer_id, Request::BlocksByRange(req)) - } - RPCRequest::BlocksByRoot(req) => { - self.propagate_request(id, peer_id, Request::BlocksByRoot(req)) - } - RPCRequest::Goodbye(reason) => { - // TODO: do not propagate - self.propagate_request(id, peer_id, Request::Goodbye(reason)); - } - }, + } Ok(RPCReceived::Response(id, resp)) => { match resp { /* Behaviour managed protocols */ @@ -734,10 +747,19 @@ impl Behaviour { } /// Consumes the events list when polled. - fn custom_poll( + fn custom_poll( &mut self, cx: &mut Context, - ) -> Poll>> { + ) -> Poll, BehaviourEvent>> { + // handle pending disconnections to perform + if !self.peers_to_dc.is_empty() { + return Poll::Ready(NBAction::NotifyHandler { + peer_id: self.peers_to_dc.remove(0), + handler: NotifyHandler::All, + event: BehaviourHandlerIn::Shutdown(None), + }); + } + // check the peer manager for events loop { match self.peer_manager.poll_next_unpin(cx) { @@ -756,11 +778,20 @@ impl Behaviour { PeerManagerEvent::MetaData(peer_id) => { self.send_meta_data_request(peer_id); } - PeerManagerEvent::_DisconnectPeer(_peer_id) => { - //TODO: Implement - } - PeerManagerEvent::_BanPeer(_peer_id) => { - //TODO: Implement + PeerManagerEvent::DisconnectPeer(peer_id) => { + debug!(self.log, "PeerManager requested to disconnect a peer"; + "peer_id" => peer_id.to_string()); + // queue for disabling + self.peers_to_dc.push(peer_id.clone()); + // send one goodbye + return Poll::Ready(NBAction::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event: BehaviourHandlerIn::Shutdown(Some(( + RequestId::Behaviour, + RPCRequest::Goodbye(GoodbyeReason::Fault), + ))), + }); } }, Poll::Pending => break, @@ -872,6 +903,9 @@ impl std::convert::From> for RPCCodedResponse { @@ -888,7 +922,7 @@ pub enum BehaviourEvent { /// The peer that sent the request. peer_id: PeerId, /// Identifier of the request. All responses to this request must use this id. - id: SubstreamId, + id: PeerRequestId, /// Request the peer sent. request: Request, }, diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 7e1f2cd2d..9bdf6ef08 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -15,7 +15,7 @@ mod service; pub mod types; pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage}; -pub use behaviour::{BehaviourEvent, Request, Response}; +pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response}; pub use config::Config as NetworkConfig; pub use discovery::enr_ext::{CombinedKeyExt, EnrExt}; pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs index 375ab71dc..a051f9b77 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs @@ -96,9 +96,7 @@ pub enum PeerManagerEvent { /// Request METADATA from a peer. MetaData(PeerId), /// The peer should be disconnected. - _DisconnectPeer(PeerId), - /// The peer should be disconnected and banned. - _BanPeer(PeerId), + DisconnectPeer(PeerId), } impl PeerManager { @@ -234,6 +232,12 @@ impl PeerManager { self.connect_peer(peer_id, ConnectingType::Dialing) } + /// Updates the database informing that a peer is being disconnected. + pub fn _disconnecting_peer(&mut self, _peer_id: &PeerId) -> bool { + // TODO: implement + true + } + /// Reports a peer for some action. /// /// If the peer doesn't exist, log a warning and insert defaults. @@ -267,7 +271,7 @@ impl PeerManager { // They closed early, this could mean poor connection PeerAction::MidToleranceError } - RPCError::InternalError(_reason) => { + RPCError::InternalError(_) | RPCError::HandlerRejected => { // Our fault. Do nothing return; } @@ -444,7 +448,8 @@ impl PeerManager { for id in ban_queue { pdb.ban(&id); - self.events.push(PeerManagerEvent::_BanPeer(id.clone())); + self.events + .push(PeerManagerEvent::DisconnectPeer(id.clone())); } for id in unban_queue { diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index e60ee28c1..e08375703 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -22,7 +22,7 @@ use std::{ task::{Context, Poll}, time::{Duration, Instant}, }; -use tokio::time::{delay_queue, DelayQueue}; +use tokio::time::{delay_queue, delay_until, Delay, DelayQueue, Instant as TInstant}; use types::EthSpec; //TODO: Implement check_timeout() on the substream types @@ -33,6 +33,9 @@ pub const RESPONSE_TIMEOUT: u64 = 10; /// The number of times to retry an outbound upgrade in the case of IO errors. const IO_ERROR_RETRIES: u8 = 3; +/// Maximum time given to the handler to perform shutdown operations. +const SHUTDOWN_TIMEOUT_SECS: u8 = 15; + /// Identifier of inbound and outbound substreams from the handler's perspective. #[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)] pub struct SubstreamId(usize); @@ -116,6 +119,9 @@ where /// Value to return from `connection_keep_alive`. keep_alive: KeepAlive, + /// State of the handler. + state: HandlerState, + /// After the given duration has elapsed, an inactive connection will shutdown. inactive_timeout: Duration, @@ -127,6 +133,18 @@ where log: slog::Logger, } +enum HandlerState { + /// The handler is active. All messages are sent and received. + Active, + /// The handler is shutting_down. + /// + /// While in this state the handler rejects new requests but tries to finish existing ones. + ShuttingDown(Delay), + /// The handler is deactivated. A goodbye has been sent and no more messages are sent or + /// received. + Deactivated, +} + /// Contains the information the handler keeps on established outbound substreams. struct OutboundInfo { /// State of the substream. @@ -278,6 +296,7 @@ where outbound_substreams_delay: DelayQueue::new(), current_inbound_substream_id: SubstreamId(0), current_outbound_substream_id: SubstreamId(0), + state: HandlerState::Active, max_dial_negotiated: 8, keep_alive: KeepAlive::Yes, inactive_timeout, @@ -302,10 +321,170 @@ where &mut self.listen_protocol } + /// Initiates the handler's shutdown process, sending an optional last message to the peer. + pub fn shutdown(&mut self, final_msg: Option<(RequestId, RPCRequest)>) { + if matches!(self.state, HandlerState::Active) { + debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len()); + // we now drive to completion communications already dialed/established + for (id, req) in self.dial_queue.pop() { + self.pending_errors.push(HandlerErr::Outbound { + id, + proto: req.protocol(), + error: RPCError::HandlerRejected, + }) + } + + // Queue our final message, if any + if let Some((id, req)) = final_msg { + self.dial_queue.push((id, req)); + } + + self.state = HandlerState::ShuttingDown(delay_until( + TInstant::now() + Duration::from_secs(SHUTDOWN_TIMEOUT_SECS as u64), + )); + } + self.update_keep_alive(); + } + /// Opens an outbound substream with a request. fn send_request(&mut self, id: RequestId, req: RPCRequest) { - self.dial_queue.push((id, req)); - self.update_keep_alive(); + match self.state { + HandlerState::Active => { + self.dial_queue.push((id, req)); + self.update_keep_alive(); + } + _ => { + self.pending_errors.push(HandlerErr::Outbound { + id, + proto: req.protocol(), + error: RPCError::HandlerRejected, + }); + } + } + } + + /// Sends a response to a peer's request. + // NOTE: If the substream has closed due to inactivity, or the substream is in the + // wrong state a response will fail silently. + fn send_response(&mut self, inbound_id: SubstreamId, response: RPCCodedResponse) { + // Variables indicating if the response is an error response or a multi-part + // response + let res_is_error = response.is_error(); + let res_is_multiple = response.multiple_responses(); + + // check if the stream matching the response still exists + let (substream_state, protocol) = match self.inbound_substreams.get_mut(&inbound_id) { + Some((substream_state, _, protocol)) => (substream_state, protocol), + None => { + warn!(self.log, "Stream has expired. Response not sent"; + "response" => response.to_string(), "id" => inbound_id); + return; + } + }; + + // If the response we are sending is an error, report back for handling + match response { + RPCCodedResponse::InvalidRequest(ref reason) + | RPCCodedResponse::ServerError(ref reason) + | RPCCodedResponse::Unknown(ref reason) => { + let code = &response + .error_code() + .expect("Error response should map to an error code"); + let err = HandlerErr::Inbound { + id: inbound_id, + proto: *protocol, + error: RPCError::ErrorResponse(*code, reason.clone()), + }; + self.pending_errors.push(err); + } + _ => {} // not an error, continue. + } + + if matches!(self.state, HandlerState::Deactivated) { + // we no longer send responses after the handler is deactivated + debug!(self.log, "Response not sent. Deactivated handler"; + "response" => response.to_string(), "id" => inbound_id); + return; + } + + match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) { + InboundSubstreamState::ResponseIdle(substream) => { + // close the stream if there is no response + if let RPCCodedResponse::StreamTermination(_) = response { + *substream_state = InboundSubstreamState::Closing(substream); + } else { + // send the response + // if it's a single rpc request or an error close the stream after. + *substream_state = InboundSubstreamState::ResponsePendingSend { + substream, + message: response, + closing: !res_is_multiple | res_is_error, + } + } + } + InboundSubstreamState::ResponsePendingSend { + substream, + message, + closing, + } if res_is_multiple => { + // the stream is in use, add the request to a pending queue if active + self.queued_outbound_items + .entry(inbound_id) + .or_insert_with(Vec::new) + .push(response); + + // return the state + *substream_state = InboundSubstreamState::ResponsePendingSend { + substream, + message, + closing, + }; + } + InboundSubstreamState::ResponsePendingFlush { substream, closing } + if res_is_multiple => + { + // the stream is in use, add the request to a pending queue + self.queued_outbound_items + .entry(inbound_id) + .or_insert_with(Vec::new) + .push(response); + + // return the state + *substream_state = + InboundSubstreamState::ResponsePendingFlush { substream, closing }; + } + InboundSubstreamState::Closing(substream) => { + *substream_state = InboundSubstreamState::Closing(substream); + debug!(self.log, "Response not sent. Stream is closing"; "response" => response.to_string()); + } + InboundSubstreamState::ResponsePendingSend { + substream, message, .. + } => { + *substream_state = InboundSubstreamState::ResponsePendingSend { + substream, + message, + closing: true, + }; + error!( + self.log, + "Attempted sending multiple responses to a single response request" + ); + } + InboundSubstreamState::ResponsePendingFlush { substream, .. } => { + *substream_state = InboundSubstreamState::ResponsePendingFlush { + substream, + closing: true, + }; + error!( + self.log, + "Attempted sending multiple responses to a single response request" + ); + } + InboundSubstreamState::Poisoned => { + crit!(self.log, "Poisoned inbound substream"); + unreachable!("Coding error: Poisoned substream"); + } + } } /// Updates the `KeepAlive` returned by `connection_keep_alive`. @@ -316,15 +495,25 @@ where fn update_keep_alive(&mut self) { // Check that we don't have outbound items pending for dialing, nor dialing, nor // established. Also check that there are no established inbound substreams. + // Errors and events need to be reported back, so check those too. let should_shutdown = self.dial_queue.is_empty() - && self.dial_negotiated == 0 && self.outbound_substreams.is_empty() - && self.inbound_substreams.is_empty(); + && self.inbound_substreams.is_empty() + && self.pending_errors.is_empty() + && self.events_out.is_empty() + && self.dial_negotiated == 0; - if should_shutdown { - self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout) - } else { - self.keep_alive = KeepAlive::Yes + match self.keep_alive { + KeepAlive::Yes if should_shutdown => { + self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); + } + KeepAlive::Yes => {} // We continue being active + KeepAlive::Until(_) if should_shutdown => {} // Already deemed inactive + KeepAlive::Until(_) => { + // No longer idle + self.keep_alive = KeepAlive::Yes; + } + KeepAlive::No => {} // currently not used } } } @@ -348,29 +537,32 @@ where &mut self, substream: >::Output, ) { - let (req, substream) = substream; - // drop the stream - if let RPCRequest::Goodbye(_) = req { - self.events_out - .push(RPCReceived::Request(self.current_inbound_substream_id, req)); - self.current_inbound_substream_id.0 += 1; + // only accept new peer requests when active + if !matches!(self.state, HandlerState::Active) { return; } - // New inbound request. Store the stream and tag the output. - let delay_key = self.inbound_substreams_delay.insert( - self.current_inbound_substream_id, - Duration::from_secs(RESPONSE_TIMEOUT), - ); - let awaiting_stream = InboundSubstreamState::ResponseIdle(substream); - self.inbound_substreams.insert( - self.current_inbound_substream_id, - (awaiting_stream, Some(delay_key), req.protocol()), - ); + let (req, substream) = substream; + + // store requests that expect responses + if req.expected_responses() > 0 { + // Store the stream and tag the output. + let delay_key = self.inbound_substreams_delay.insert( + self.current_inbound_substream_id, + Duration::from_secs(RESPONSE_TIMEOUT), + ); + let awaiting_stream = InboundSubstreamState::ResponseIdle(substream); + self.inbound_substreams.insert( + self.current_inbound_substream_id, + (awaiting_stream, Some(delay_key), req.protocol()), + ); + } self.events_out .push(RPCReceived::Request(self.current_inbound_substream_id, req)); self.current_inbound_substream_id.0 += 1; + + self.update_keep_alive(); } fn inject_fully_negotiated_outbound( @@ -379,9 +571,20 @@ where request_info: Self::OutboundOpenInfo, ) { self.dial_negotiated -= 1; + let (id, request) = request_info; + let proto = request.protocol(); + + // accept outbound connections only if the handler is not deactivated + if matches!(self.state, HandlerState::Deactivated) { + self.pending_errors.push(HandlerErr::Outbound { + id, + proto, + error: RPCError::HandlerRejected, + }); + return; + } // add the stream to substreams if we expect a response, otherwise drop the stream. - let (id, request) = request_info; let expected_responses = request.expected_responses(); if expected_responses > 0 { // new outbound request. Store the stream and tag the output. @@ -389,7 +592,6 @@ where self.current_outbound_substream_id, Duration::from_secs(RESPONSE_TIMEOUT), ); - let proto = request.protocol(); let awaiting_stream = OutboundSubstreamState::RequestPendingResponse { substream: out, request, @@ -422,128 +624,10 @@ where self.update_keep_alive(); } - // NOTE: If the substream has closed due to inactivity, or the substream is in the - // wrong state a response will fail silently. fn inject_event(&mut self, rpc_event: Self::InEvent) { match rpc_event { RPCSend::Request(id, req) => self.send_request(id, req), - RPCSend::Response(inbound_id, response) => { - // Variables indicating if the response is an error response or a multi-part - // response - let res_is_error = response.is_error(); - let res_is_multiple = response.multiple_responses(); - - // check if the stream matching the response still exists - let (substream_state, protocol) = match self.inbound_substreams.get_mut(&inbound_id) - { - Some((substream_state, _, protocol)) => (substream_state, protocol), - None => { - warn!(self.log, "Stream has expired. Response not sent"; - "response" => response.to_string(), "id" => inbound_id); - return; - } - }; - - // If the response we are sending is an error, report back for handling - match response { - RPCCodedResponse::InvalidRequest(ref reason) - | RPCCodedResponse::ServerError(ref reason) - | RPCCodedResponse::Unknown(ref reason) => { - let code = &response - .error_code() - .expect("Error response should map to an error code"); - let err = HandlerErr::Inbound { - id: inbound_id, - proto: *protocol, - error: RPCError::ErrorResponse(*code, reason.clone()), - }; - self.pending_errors.push(err); - } - _ => {} // not an error, continue. - } - - match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) { - InboundSubstreamState::ResponseIdle(substream) => { - // close the stream if there is no response - match response { - RPCCodedResponse::StreamTermination(_) => { - *substream_state = InboundSubstreamState::Closing(substream); - } - _ => { - // send the response - // if it's a single rpc request or an error, close the stream after - *substream_state = InboundSubstreamState::ResponsePendingSend { - substream, - message: response, - closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses - }; - } - } - } - InboundSubstreamState::ResponsePendingSend { - substream, - message, - closing, - } if res_is_multiple => { - // the stream is in use, add the request to a pending queue - self.queued_outbound_items - .entry(inbound_id) - .or_insert_with(Vec::new) - .push(response); - - // return the state - *substream_state = InboundSubstreamState::ResponsePendingSend { - substream, - message, - closing, - }; - } - InboundSubstreamState::ResponsePendingFlush { substream, closing } - if res_is_multiple => - { - // the stream is in use, add the request to a pending queue - self.queued_outbound_items - .entry(inbound_id) - .or_insert_with(Vec::new) - .push(response); - - // return the state - *substream_state = - InboundSubstreamState::ResponsePendingFlush { substream, closing }; - } - InboundSubstreamState::Closing(substream) => { - *substream_state = InboundSubstreamState::Closing(substream); - debug!(self.log, "Response not sent. Stream is closing"; "response" => format!("{}",response)); - } - InboundSubstreamState::ResponsePendingSend { - substream, message, .. - } => { - *substream_state = InboundSubstreamState::ResponsePendingSend { - substream, - message, - closing: true, - }; - error!( - self.log, - "Attempted sending multiple responses to a single response request" - ); - } - InboundSubstreamState::ResponsePendingFlush { substream, .. } => { - *substream_state = InboundSubstreamState::ResponsePendingFlush { - substream, - closing: true, - }; - error!( - self.log, - "Attempted sending multiple responses to a single response request" - ); - } - InboundSubstreamState::Poisoned => { - crit!(self.log, "Poisoned inbound substream"); - unreachable!("Coding error: Poisoned substream"); - } - } - } + RPCSend::Response(inbound_id, response) => self.send_response(inbound_id, response), } } @@ -563,6 +647,10 @@ where } } + // This dialing is now considered failed + self.dial_negotiated -= 1; + self.update_keep_alive(); + self.outbound_io_error_retries = 0; // map the error let error = match error { @@ -621,6 +709,21 @@ where self.events_out.shrink_to_fit(); } + // Check if we are shutting down, and if the timer ran out + if let HandlerState::ShuttingDown(delay) = &self.state { + if delay.is_elapsed() { + self.state = HandlerState::Deactivated; + debug!(self.log, "Handler deactivated"); + // Drain queued responses + for (inbound_id, queued_responses) in self.queued_outbound_items.drain() { + for response in queued_responses { + debug!(self.log, "Response not sent. Deactivated handler"; + "response" => response.to_string(), "id" => inbound_id); + } + } + } + } + // purge expired inbound substreams and send an error loop { match self.inbound_substreams_delay.poll_next_unpin(cx) { @@ -686,6 +789,9 @@ where } } + // when deactivated, close all streams + let deactivated = matches!(self.state, HandlerState::Deactivated); + // drive inbound streams that need to be processed for request_id in self.inbound_substreams.keys().copied().collect::>() { // Drain all queued items until all messages have been processed for this stream @@ -704,57 +810,78 @@ where message, closing, } => { - match Sink::poll_ready(Pin::new(&mut substream), cx) { - Poll::Ready(Ok(())) => { - // stream is ready to send data - match Sink::start_send(Pin::new(&mut substream), message) { - Ok(()) => { - // await flush - entry.get_mut().0 = + if deactivated { + if !closing { + // inform back to cancel this request's processing + self.pending_errors.push(HandlerErr::Inbound { + id: request_id, + proto: entry.get().2, + error: RPCError::HandlerRejected, + }); + } + entry.get_mut().0 = InboundSubstreamState::Closing(substream); + drive_stream_further = true; + } else { + match Sink::poll_ready(Pin::new(&mut substream), cx) { + Poll::Ready(Ok(())) => { + // stream is ready to send data + match Sink::start_send( + Pin::new(&mut substream), + message, + ) { + Ok(()) => { + // await flush + entry.get_mut().0 = InboundSubstreamState::ResponsePendingFlush { substream, closing, }; - drive_stream_further = true; - } - Err(e) => { - // error with sending in the codec - warn!(self.log, "Error sending RPC message"; "error" => e.to_string()); - // keep connection with the peer and return the - // stream to awaiting response if this message - // wasn't closing the stream - // TODO: Duplicate code - if closing { - entry.get_mut().0 = - InboundSubstreamState::Closing(substream); drive_stream_further = true; - } else { - // check for queued chunks and update the stream - entry.get_mut().0 = apply_queued_responses( - substream, - &mut self - .queued_outbound_items - .get_mut(&request_id), - &mut drive_stream_further, - ); + } + Err(e) => { + // error with sending in the codec + warn!(self.log, "Error sending RPC message"; "error" => e.to_string()); + // keep connection with the peer and return the + // stream to awaiting response if this message + // wasn't closing the stream + if closing { + entry.get_mut().0 = + InboundSubstreamState::Closing( + substream, + ); + drive_stream_further = true; + } else { + // check for queued chunks and update the stream + entry.get_mut().0 = apply_queued_responses( + substream, + &mut self + .queued_outbound_items + .get_mut(&request_id), + &mut drive_stream_further, + ); + } } } } - } - Poll::Ready(Err(e)) => { - error!(self.log, "Outbound substream error while sending RPC message: {:?}", e); - entry.remove(); - self.update_keep_alive(); - return Poll::Ready(ProtocolsHandlerEvent::Close(e)); - } - Poll::Pending => { - // the stream is not yet ready, continue waiting - entry.get_mut().0 = - InboundSubstreamState::ResponsePendingSend { - substream, - message, - closing, - }; + Poll::Ready(Err(e)) => { + error!( + self.log, + "Outbound substream error while sending RPC message: {:?}", + e + ); + entry.remove(); + self.update_keep_alive(); + return Poll::Ready(ProtocolsHandlerEvent::Close(e)); + } + Poll::Pending => { + // the stream is not yet ready, continue waiting + entry.get_mut().0 = + InboundSubstreamState::ResponsePendingSend { + substream, + message, + closing, + }; + } } } } @@ -766,7 +893,15 @@ where Poll::Ready(Ok(())) => { // finished flushing // TODO: Duplicate code - if closing { + if closing | deactivated { + if !closing { + // inform back to cancel this request's processing + self.pending_errors.push(HandlerErr::Inbound { + id: request_id, + proto: entry.get().2, + error: RPCError::HandlerRejected, + }); + } entry.get_mut().0 = InboundSubstreamState::Closing(substream); drive_stream_further = true; @@ -805,11 +940,16 @@ where } } InboundSubstreamState::ResponseIdle(substream) => { - entry.get_mut().0 = apply_queued_responses( - substream, - &mut self.queued_outbound_items.get_mut(&request_id), - &mut drive_stream_further, - ); + if !deactivated { + entry.get_mut().0 = apply_queued_responses( + substream, + &mut self.queued_outbound_items.get_mut(&request_id), + &mut drive_stream_further, + ); + } else { + entry.get_mut().0 = InboundSubstreamState::Closing(substream); + drive_stream_further = true; + } } InboundSubstreamState::Closing(mut substream) => { match Sink::poll_close(Pin::new(&mut substream), cx) { @@ -866,6 +1006,18 @@ where }; match state { + OutboundSubstreamState::RequestPendingResponse { + substream, + request: _, + } if deactivated => { + // the handler is deactivated. Close the stream + entry.get_mut().state = OutboundSubstreamState::Closing(substream); + self.pending_errors.push(HandlerErr::Outbound { + id: entry.get().req_id, + proto: entry.get().proto, + error: RPCError::HandlerRejected, + }) + } OutboundSubstreamState::RequestPendingResponse { mut substream, request, diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 1af2389d7..a699a515a 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -80,6 +80,8 @@ impl std::fmt::Display for RPCSend { pub struct RPCMessage { /// The peer that sent the message. pub peer_id: PeerId, + /// Handler managing this message. + pub conn_id: ConnectionId, /// The message that was sent. pub event: as ProtocolsHandler>::OutEvent, } @@ -102,14 +104,35 @@ impl RPC { } } + /// Sends an RPC response. + /// + /// The peer must be connected for this to succeed. + pub fn send_response( + &mut self, + peer_id: PeerId, + id: (ConnectionId, SubstreamId), + event: RPCCodedResponse, + ) { + self.events.push(NetworkBehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::One(id.0), + event: RPCSend::Response(id.1, event), + }); + } + /// Submits an RPC request. /// /// The peer must be connected for this to succeed. - pub fn send_rpc(&mut self, peer_id: PeerId, event: RPCSend) { + pub fn send_request( + &mut self, + peer_id: PeerId, + request_id: RequestId, + event: RPCRequest, + ) { self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::Any, - event, + event: RPCSend::Request(request_id, event), }); } } @@ -126,7 +149,7 @@ where SubstreamProtocol::new(RPCProtocol { phantom: PhantomData, }), - Duration::from_secs(5), + Duration::from_secs(30), &self.log, ) } @@ -169,13 +192,14 @@ where fn inject_event( &mut self, peer_id: PeerId, - _: ConnectionId, + conn_id: ConnectionId, event: ::OutEvent, ) { // send the event to the user self.events .push(NetworkBehaviourAction::GenerateEvent(RPCMessage { peer_id, + conn_id, event, })); } diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 6f5beb749..ad98023ea 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -388,8 +388,10 @@ pub enum RPCError { InvalidData, /// An error occurred due to internal reasons. Ex: timer failure. InternalError(&'static str), - /// Negotiation with this peer timed out + /// Negotiation with this peer timed out. NegotiationTimeout, + /// Handler rejected this request. + HandlerRejected, } impl From for RPCError { @@ -427,6 +429,7 @@ impl std::fmt::Display for RPCError { RPCError::IncompleteStream => write!(f, "Stream ended unexpectedly"), RPCError::InternalError(ref err) => write!(f, "Internal error: {}", err), RPCError::NegotiationTimeout => write!(f, "Negotiation timeout"), + RPCError::HandlerRejected => write!(f, "Handler rejected the request"), } } } @@ -444,6 +447,7 @@ impl std::error::Error for RPCError { RPCError::InternalError(_) => None, RPCError::ErrorResponse(_, _) => None, RPCError::NegotiationTimeout => None, + RPCError::HandlerRejected => None, } } } diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index e527ef7b6..70e57804b 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -1,7 +1,7 @@ -use crate::behaviour::{Behaviour, BehaviourEvent, Request, Response}; +use crate::behaviour::{Behaviour, BehaviourEvent, PeerRequestId, Request, Response}; use crate::discovery::enr; use crate::multiaddr::Protocol; -use crate::rpc::{RPCResponseErrorCode, RequestId, SubstreamId}; +use crate::rpc::{RPCResponseErrorCode, RequestId}; use crate::types::{error, GossipKind}; use crate::EnrExt; use crate::{NetworkConfig, NetworkGlobals}; @@ -239,23 +239,16 @@ impl Service { pub fn respond_with_error( &mut self, peer_id: PeerId, - stream_id: SubstreamId, + id: PeerRequestId, error: RPCResponseErrorCode, reason: String, ) { - self.swarm - ._send_error_reponse(peer_id, stream_id, error, reason); + self.swarm._send_error_reponse(peer_id, id, error, reason); } /// Sends a response to a peer's request. - pub fn send_response( - &mut self, - peer_id: PeerId, - stream_id: SubstreamId, - response: Response, - ) { - self.swarm - .send_successful_response(peer_id, stream_id, response); + pub fn send_response(&mut self, peer_id: PeerId, id: PeerRequestId, response: Response) { + self.swarm.send_successful_response(peer_id, id, response); } pub async fn next_event(&mut self) -> Libp2pEvent { diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 025914d6b..12a00ae28 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -10,8 +10,8 @@ use crate::error; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; use eth2_libp2p::{ - rpc::{RPCError, RequestId, SubstreamId}, - MessageId, NetworkGlobals, PeerId, PubsubMessage, Request, Response, + rpc::{RPCError, RequestId}, + MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, }; use futures::prelude::*; use processor::Processor; @@ -46,7 +46,7 @@ pub enum RouterMessage { /// An RPC request has been received. RPCRequestReceived { peer_id: PeerId, - stream_id: SubstreamId, + id: PeerRequestId, request: Request, }, /// An RPC response has been received. @@ -127,10 +127,10 @@ impl Router { } RouterMessage::RPCRequestReceived { peer_id, - stream_id, + id, request, } => { - self.handle_rpc_request(peer_id, stream_id, request); + self.handle_rpc_request(peer_id, id, request); } RouterMessage::RPCResponseReceived { peer_id, @@ -160,11 +160,11 @@ impl Router { /* RPC - Related functionality */ /// A new RPC request has been received from the network. - fn handle_rpc_request(&mut self, peer_id: PeerId, stream_id: SubstreamId, request: Request) { + fn handle_rpc_request(&mut self, peer_id: PeerId, id: PeerRequestId, request: Request) { match request { Request::Status(status_message) => { self.processor - .on_status_request(peer_id, stream_id, status_message) + .on_status_request(peer_id, id, status_message) } Request::Goodbye(goodbye_reason) => { debug!( @@ -177,10 +177,10 @@ impl Router { } Request::BlocksByRange(request) => self .processor - .on_blocks_by_range_request(peer_id, stream_id, request), + .on_blocks_by_range_request(peer_id, id, request), Request::BlocksByRoot(request) => self .processor - .on_blocks_by_root_request(peer_id, stream_id, request), + .on_blocks_by_root_request(peer_id, id, request), } } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 8cbab2120..79cbac4c7 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -8,7 +8,7 @@ use beacon_chain::{ BeaconChain, BeaconChainTypes, BlockError, BlockProcessingOutcome, GossipVerifiedBlock, }; use eth2_libp2p::rpc::*; -use eth2_libp2p::{NetworkGlobals, PeerId, Request, Response}; +use eth2_libp2p::{NetworkGlobals, PeerId, PeerRequestId, Request, Response}; use itertools::process_results; use slog::{debug, error, o, trace, warn}; use ssz::Encode; @@ -118,7 +118,7 @@ impl Processor { pub fn on_status_request( &mut self, peer_id: PeerId, - request_id: SubstreamId, + request_id: PeerRequestId, status: StatusMessage, ) { debug!( @@ -283,7 +283,7 @@ impl Processor { pub fn on_blocks_by_root_request( &mut self, peer_id: PeerId, - request_id: SubstreamId, + request_id: PeerRequestId, request: BlocksByRootRequest, ) { let mut send_block_count = 0; @@ -321,7 +321,7 @@ impl Processor { pub fn on_blocks_by_range_request( &mut self, peer_id: PeerId, - request_id: SubstreamId, + request_id: PeerRequestId, req: BlocksByRangeRequest, ) { debug!( @@ -958,29 +958,24 @@ impl HandlerNetworkContext { }) } - pub fn send_response( - &mut self, - peer_id: PeerId, - response: Response, - stream_id: SubstreamId, - ) { + pub fn send_response(&mut self, peer_id: PeerId, response: Response, id: PeerRequestId) { self.inform_network(NetworkMessage::SendResponse { peer_id, - stream_id, + id, response, }) } pub fn _send_error_response( &mut self, peer_id: PeerId, - substream_id: SubstreamId, + id: PeerRequestId, error: RPCResponseErrorCode, reason: String, ) { self.inform_network(NetworkMessage::SendError { peer_id, error, - substream_id, + id, reason, }) } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index edd480f79..669f67037 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -8,8 +8,8 @@ use crate::{error, metrics}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::{ - rpc::{RPCResponseErrorCode, RequestId, SubstreamId}, - Libp2pEvent, PubsubMessage, Request, Response, + rpc::{RPCResponseErrorCode, RequestId}, + Libp2pEvent, PeerRequestId, PubsubMessage, Request, Response, }; use eth2_libp2p::{BehaviourEvent, Enr, MessageId, NetworkGlobals, PeerId}; use futures::prelude::*; @@ -164,11 +164,11 @@ fn spawn_service( NetworkMessage::SendRequest{ peer_id, request, request_id } => { service.libp2p.send_request(peer_id, request_id, request); } - NetworkMessage::SendResponse{ peer_id, response, stream_id } => { - service.libp2p.send_response(peer_id, stream_id, response); + NetworkMessage::SendResponse{ peer_id, response, id } => { + service.libp2p.send_response(peer_id, id, response); } - NetworkMessage::SendError{ peer_id, error, substream_id, reason } => { - service.libp2p.respond_with_error(peer_id, substream_id, error, reason); + NetworkMessage::SendError{ peer_id, error, id, reason } => { + service.libp2p.respond_with_error(peer_id, id, error, reason); } NetworkMessage::Propagate { propagation_source, @@ -280,7 +280,7 @@ fn spawn_service( }; let _ = service .router_send - .send(RouterMessage::RPCRequestReceived{peer_id, stream_id:id, request}) + .send(RouterMessage::RPCRequestReceived{peer_id, id, request}) .map_err(|_| { debug!(service.log, "Failed to send RPC to router"); }); @@ -288,7 +288,7 @@ fn spawn_service( BehaviourEvent::ResponseReceived{peer_id, id, response} => { let _ = service .router_send - .send(RouterMessage::RPCResponseReceived{ peer_id, request_id:id, response }) + .send(RouterMessage::RPCResponseReceived{ peer_id, request_id: id, response }) .map_err(|_| { debug!(service.log, "Failed to send RPC to router"); }); @@ -297,7 +297,7 @@ fn spawn_service( BehaviourEvent::RPCFailed{id, peer_id, error} => { let _ = service .router_send - .send(RouterMessage::RPCFailed{ peer_id, request_id:id, error }) + .send(RouterMessage::RPCFailed{ peer_id, request_id: id, error }) .map_err(|_| { debug!(service.log, "Failed to send RPC to router"); }); @@ -425,7 +425,7 @@ pub enum NetworkMessage { SendResponse { peer_id: PeerId, response: Response, - stream_id: SubstreamId, + id: PeerRequestId, }, /// Respond to a peer's request with an error. SendError { @@ -434,7 +434,7 @@ pub enum NetworkMessage { peer_id: PeerId, error: RPCResponseErrorCode, reason: String, - substream_id: SubstreamId, + id: PeerRequestId, }, /// Publish a list of messages to the gossipsub protocol. Publish { messages: Vec> },