Restructure code for libp2p upgrade (#3850)
Our custom RPC implementation is lagging from the libp2p v50 version. We are going to need to change a bunch of function names and would be nice to have consistent ordering of function names inside the handlers. This is a precursor to the libp2p upgrade to minimize merge conflicts in function ordering.
This commit is contained in:
parent
59a7a4703c
commit
4e5e7ee1fc
@ -327,61 +327,6 @@ where
|
|||||||
self.listen_protocol.clone()
|
self.listen_protocol.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_fully_negotiated_inbound(
|
|
||||||
&mut self,
|
|
||||||
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
|
|
||||||
_info: Self::InboundOpenInfo,
|
|
||||||
) {
|
|
||||||
// only accept new peer requests when active
|
|
||||||
if !matches!(self.state, HandlerState::Active) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let (req, substream) = substream;
|
|
||||||
let expected_responses = req.expected_responses();
|
|
||||||
|
|
||||||
// store requests that expect responses
|
|
||||||
if expected_responses > 0 {
|
|
||||||
if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS {
|
|
||||||
// Store the stream and tag the output.
|
|
||||||
let delay_key = self.inbound_substreams_delay.insert(
|
|
||||||
self.current_inbound_substream_id,
|
|
||||||
Duration::from_secs(RESPONSE_TIMEOUT),
|
|
||||||
);
|
|
||||||
let awaiting_stream = InboundState::Idle(substream);
|
|
||||||
self.inbound_substreams.insert(
|
|
||||||
self.current_inbound_substream_id,
|
|
||||||
InboundInfo {
|
|
||||||
state: awaiting_stream,
|
|
||||||
pending_items: VecDeque::with_capacity(expected_responses as usize),
|
|
||||||
delay_key: Some(delay_key),
|
|
||||||
protocol: req.protocol(),
|
|
||||||
request_start_time: Instant::now(),
|
|
||||||
remaining_chunks: expected_responses,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
self.events_out.push(Err(HandlerErr::Inbound {
|
|
||||||
id: self.current_inbound_substream_id,
|
|
||||||
proto: req.protocol(),
|
|
||||||
error: RPCError::HandlerRejected,
|
|
||||||
}));
|
|
||||||
return self.shutdown(None);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we received a goodbye, shutdown the connection.
|
|
||||||
if let InboundRequest::Goodbye(_) = req {
|
|
||||||
self.shutdown(None);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.events_out.push(Ok(RPCReceived::Request(
|
|
||||||
self.current_inbound_substream_id,
|
|
||||||
req,
|
|
||||||
)));
|
|
||||||
self.current_inbound_substream_id.0 += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn inject_fully_negotiated_outbound(
|
fn inject_fully_negotiated_outbound(
|
||||||
&mut self,
|
&mut self,
|
||||||
out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
|
out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
|
||||||
@ -438,6 +383,64 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn inject_fully_negotiated_inbound(
|
||||||
|
&mut self,
|
||||||
|
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
|
||||||
|
_info: Self::InboundOpenInfo,
|
||||||
|
) {
|
||||||
|
// only accept new peer requests when active
|
||||||
|
if !matches!(self.state, HandlerState::Active) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (req, substream) = substream;
|
||||||
|
let expected_responses = req.expected_responses();
|
||||||
|
|
||||||
|
// store requests that expect responses
|
||||||
|
if expected_responses > 0 {
|
||||||
|
if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS {
|
||||||
|
// Store the stream and tag the output.
|
||||||
|
let delay_key = self.inbound_substreams_delay.insert(
|
||||||
|
self.current_inbound_substream_id,
|
||||||
|
Duration::from_secs(RESPONSE_TIMEOUT),
|
||||||
|
);
|
||||||
|
let awaiting_stream = InboundState::Idle(substream);
|
||||||
|
self.inbound_substreams.insert(
|
||||||
|
self.current_inbound_substream_id,
|
||||||
|
InboundInfo {
|
||||||
|
state: awaiting_stream,
|
||||||
|
pending_items: VecDeque::with_capacity(std::cmp::min(
|
||||||
|
expected_responses,
|
||||||
|
128,
|
||||||
|
) as usize),
|
||||||
|
delay_key: Some(delay_key),
|
||||||
|
protocol: req.protocol(),
|
||||||
|
request_start_time: Instant::now(),
|
||||||
|
remaining_chunks: expected_responses,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
self.events_out.push(Err(HandlerErr::Inbound {
|
||||||
|
id: self.current_inbound_substream_id,
|
||||||
|
proto: req.protocol(),
|
||||||
|
error: RPCError::HandlerRejected,
|
||||||
|
}));
|
||||||
|
return self.shutdown(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we received a goodbye, shutdown the connection.
|
||||||
|
if let InboundRequest::Goodbye(_) = req {
|
||||||
|
self.shutdown(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.events_out.push(Ok(RPCReceived::Request(
|
||||||
|
self.current_inbound_substream_id,
|
||||||
|
req,
|
||||||
|
)));
|
||||||
|
self.current_inbound_substream_id.0 += 1;
|
||||||
|
}
|
||||||
|
|
||||||
fn inject_event(&mut self, rpc_event: Self::InEvent) {
|
fn inject_event(&mut self, rpc_event: Self::InEvent) {
|
||||||
match rpc_event {
|
match rpc_event {
|
||||||
RPCSend::Request(id, req) => self.send_request(id, req),
|
RPCSend::Request(id, req) => self.send_request(id, req),
|
||||||
|
Loading…
Reference in New Issue
Block a user