Drive RPC streams to completion (#1219)

This commit is contained in:
Age Manning 2020-05-29 12:04:08 +10:00 committed by GitHub
parent d609a3f639
commit 08e6b4961d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -635,9 +635,9 @@ where
for request_id in self.inbound_substreams.keys().copied().collect::<Vec<_>>() { for request_id in self.inbound_substreams.keys().copied().collect::<Vec<_>>() {
// Drain all queued items until all messages have been processed for this stream // Drain all queued items until all messages have been processed for this stream
// TODO Improve this code logic // TODO Improve this code logic
let mut new_items_to_send = true; let mut drive_stream_further = true;
while new_items_to_send { while drive_stream_further {
new_items_to_send = false; drive_stream_further = false;
match self.inbound_substreams.entry(request_id) { match self.inbound_substreams.entry(request_id) {
Entry::Occupied(mut entry) => { Entry::Occupied(mut entry) => {
match std::mem::replace( match std::mem::replace(
@ -659,7 +659,8 @@ where
InboundSubstreamState::ResponsePendingFlush { InboundSubstreamState::ResponsePendingFlush {
substream, substream,
closing, closing,
} };
drive_stream_further = true;
} }
Err(e) => { Err(e) => {
// error with sending in the codec // error with sending in the codec
@ -670,7 +671,8 @@ where
// TODO: Duplicate code // TODO: Duplicate code
if closing { if closing {
entry.get_mut().0 = entry.get_mut().0 =
InboundSubstreamState::Closing(substream) InboundSubstreamState::Closing(substream);
drive_stream_further = true;
} else { } else {
// check for queued chunks and update the stream // check for queued chunks and update the stream
entry.get_mut().0 = apply_queued_responses( entry.get_mut().0 = apply_queued_responses(
@ -678,7 +680,7 @@ where
&mut self &mut self
.queued_outbound_items .queued_outbound_items
.get_mut(&request_id), .get_mut(&request_id),
&mut new_items_to_send, &mut drive_stream_further,
); );
} }
} }
@ -711,7 +713,8 @@ where
// TODO: Duplicate code // TODO: Duplicate code
if closing { if closing {
entry.get_mut().0 = entry.get_mut().0 =
InboundSubstreamState::Closing(substream) InboundSubstreamState::Closing(substream);
drive_stream_further = true;
} else { } else {
// check for queued chunks and update the stream // check for queued chunks and update the stream
entry.get_mut().0 = apply_queued_responses( entry.get_mut().0 = apply_queued_responses(
@ -719,7 +722,7 @@ where
&mut self &mut self
.queued_outbound_items .queued_outbound_items
.get_mut(&request_id), .get_mut(&request_id),
&mut new_items_to_send, &mut drive_stream_further,
); );
} }
} }
@ -750,7 +753,7 @@ where
entry.get_mut().0 = apply_queued_responses( entry.get_mut().0 = apply_queued_responses(
substream, substream,
&mut self.queued_outbound_items.get_mut(&request_id), &mut self.queued_outbound_items.get_mut(&request_id),
&mut new_items_to_send, &mut drive_stream_further,
); );
} }
InboundSubstreamState::Closing(mut substream) => { InboundSubstreamState::Closing(mut substream) => {