Adds counter of received chunks to an OutboundSubstream. Ends the str… (#1126)

* Adds counter of received chunks to an OutboundSubstream. Ends the stream when the counter reaches the desired amount of chunks that where specified in a Request.

* Keeps track of remaining chunks for a stream, rather than expected ones and calculating the remainder on each received chunk

* WIP test, waiting for stable-futures to land in master

* Improve calculation for remaining chunks in response handler. Improve initial calculation for expected chunks in outbount substream

* Remove rebase artifact

* Fix compiler errors after rebasing on master

* Clone request to allow two accesses to it that move it when determining the amount of expected responses

* Correctly terminate the stream when all chunks have been received

* WIP: test that stream is terminated correctly

* Terminate stream with a termination response. Handle further received chunks in OutboundStream::Closing branch to return errors

* Remove request clone

* Report stream timeouts when closing

* Update rpc test

* Fix BlocksByRoot RPC test to request as many chunks as responses are expected

* Adds test for correctly termined BlocksByRoot rpc stream when all chunks have been received

Co-authored-by: Age Manning <Age@AgeManning.com>
This commit is contained in:
Maximilian Ehlers 2020-05-18 13:41:01 +02:00 committed by GitHub
parent dd51a72f1f
commit ac2ff01d1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 363 additions and 20 deletions

View File

@ -75,8 +75,16 @@ where
/// Map of outbound substreams that need to be driven to completion. The `RequestId` is /// Map of outbound substreams that need to be driven to completion. The `RequestId` is
/// maintained by the application sending the request. /// maintained by the application sending the request.
outbound_substreams: /// For Responses with multiple expected response chunks a counter is added to be able to terminate the stream when the expected number has been received
FnvHashMap<OutboundRequestId, (OutboundSubstreamState<TSpec>, delay_queue::Key, Protocol)>, outbound_substreams: FnvHashMap<
OutboundRequestId,
(
OutboundSubstreamState<TSpec>,
delay_queue::Key,
Protocol,
Option<u64>,
),
>,
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
outbound_substreams_delay: DelayQueue<OutboundRequestId>, outbound_substreams_delay: DelayQueue<OutboundRequestId>,
@ -360,14 +368,19 @@ where
.outbound_substreams_delay .outbound_substreams_delay
.insert(id, Duration::from_secs(RESPONSE_TIMEOUT)); .insert(id, Duration::from_secs(RESPONSE_TIMEOUT));
let protocol = request.protocol(); let protocol = request.protocol();
let response_chunk_count = match request {
RPCRequest::BlocksByRange(ref req) => Some(req.count),
RPCRequest::BlocksByRoot(ref req) => Some(req.block_roots.len() as u64),
_ => None, // Other requests do not have a known response chunk length,
};
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse { let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
substream: out, substream: out,
request, request: request,
}; };
if let Some(_) = self if let Some(_) = self.outbound_substreams.insert(
.outbound_substreams id,
.insert(id, (awaiting_stream, delay_key, protocol)) (awaiting_stream, delay_key, protocol, response_chunk_count),
{ ) {
crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id)); crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id));
} }
} }
@ -591,7 +604,7 @@ where
loop { loop {
match self.outbound_substreams_delay.poll_next_unpin(cx) { match self.outbound_substreams_delay.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(stream_id))) => { Poll::Ready(Some(Ok(stream_id))) => {
if let Some((_id, _stream, protocol)) = if let Some((_id, _stream, protocol, _)) =
self.outbound_substreams.remove(stream_id.get_ref()) self.outbound_substreams.remove(stream_id.get_ref())
{ {
// notify the user // notify the user
@ -807,18 +820,33 @@ where
} => match substream.poll_next_unpin(cx) { } => match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(response))) => { Poll::Ready(Some(Ok(response))) => {
if request.multiple_responses() && !response.is_error() { if request.multiple_responses() && !response.is_error() {
entry.get_mut().0 = let substream_entry = entry.get_mut();
let delay_key = &substream_entry.1;
// chunks left after this one
let remaining_chunks = substream_entry
.3
.map(|count| count.saturating_sub(1))
.unwrap_or_else(|| 0);
if remaining_chunks == 0 {
// this is the last expected message, close the stream as all expected chunks have been received
substream_entry.0 =
OutboundSubstreamState::Closing(substream);
} else {
// If the response chunk was expected update the remaining number of chunks expected and reset the Timeout
substream_entry.0 =
OutboundSubstreamState::RequestPendingResponse { OutboundSubstreamState::RequestPendingResponse {
substream, substream,
request, request,
}; };
let delay_key = &entry.get().1; substream_entry.3 = Some(remaining_chunks);
self.outbound_substreams_delay self.outbound_substreams_delay.reset(
.reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT)); delay_key,
Duration::from_secs(RESPONSE_TIMEOUT),
);
}
} else { } else {
// either this is a single response request or we received an // either this is a single response request or we received an
// error // error
//trace!(self.log, "Closing single stream request");
// only expect a single response, close the stream // only expect a single response, close the stream
entry.get_mut().0 = OutboundSubstreamState::Closing(substream); entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
} }
@ -875,13 +903,14 @@ where
}, },
OutboundSubstreamState::Closing(mut substream) => { OutboundSubstreamState::Closing(mut substream) => {
match Sink::poll_close(Pin::new(&mut substream), cx) { match Sink::poll_close(Pin::new(&mut substream), cx) {
// TODO: check if this is supposed to be a stream
Poll::Ready(_) => { Poll::Ready(_) => {
// drop the stream - including if there is an error // drop the stream and its corresponding timeout
let delay_key = &entry.get().1; let delay_key = &entry.get().1;
let protocol = entry.get().2;
self.outbound_substreams_delay.remove(delay_key); self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry(); entry.remove_entry();
// adjust the RPC keep-alive
if self.outbound_substreams.is_empty() if self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty() && self.inbound_substreams.is_empty()
{ {
@ -889,6 +918,36 @@ where
Instant::now() + self.inactive_timeout, Instant::now() + self.inactive_timeout,
); );
} }
// report the stream termination to the user
//
// Streams can be terminated here if a responder tries to
// continue sending responses beyond what we would expect. Here
// we simply terminate the stream and report a stream
// termination to the application
match protocol {
Protocol::BlocksByRange => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(
request_id,
RPCCodedResponse::StreamTermination(
ResponseTermination::BlocksByRange,
),
),
));
}
Protocol::BlocksByRoot => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(
request_id,
RPCCodedResponse::StreamTermination(
ResponseTermination::BlocksByRoot,
),
),
));
}
_ => {} // all other protocols are do not have multiple responses and we do not inform the user, we simply drop the stream.
}
} }
Poll::Pending => { Poll::Pending => {
entry.get_mut().0 = OutboundSubstreamState::Closing(substream); entry.get_mut().0 = OutboundSubstreamState::Closing(substream);

View File

@ -235,6 +235,140 @@ async fn test_blocks_by_range_chunked_rpc() {
} }
} }
#[tokio::test]
// Tests that a streamed BlocksByRange RPC Message terminates when all expected chunks were received
async fn test_blocks_by_range_chunked_rpc_terminates_correctly() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Debug;
let enable_logging = false;
let messages_to_send = 10;
let extra_messages_to_send = 10;
let log = common::build_log(log_level, enable_logging);
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
// BlocksByRange Request
let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest {
start_slot: 0,
count: messages_to_send,
step: 0,
});
// BlocksByRange Response
let spec = E::default_spec();
let empty_block = BeaconBlock::empty(&spec);
let empty_signed = SignedBeaconBlock {
message: empty_block,
signature: Signature::empty_signature(),
};
let rpc_response = RPCResponse::BlocksByRange(Box::new(empty_signed));
// keep count of the number of messages received
let mut messages_received: u64 = 0;
// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.swarm
.send_rpc(peer_id, RPCEvent::Request(10, rpc_request.clone()));
}
Libp2pEvent::Behaviour(BehaviourEvent::RPC(_, event)) => match event {
// Should receive the RPC response
RPCEvent::Response(id, response) => {
if id == 10 {
debug!(log, "Sender received a response");
match response {
RPCCodedResponse::Success(res) => {
assert_eq!(res, rpc_response.clone());
messages_received += 1;
}
RPCCodedResponse::StreamTermination(_) => {
// should be exactly 10 messages, as requested
assert_eq!(messages_received, messages_to_send);
}
_ => panic!("Invalid RPC received"),
}
}
}
_ => {} // Ignore other RPC messages
},
_ => {} // Ignore other behaviour events
}
}
};
// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
// messages
let mut message_info = None;
// the number of messages we've sent
let mut messages_sent = 0;
let receiver_future = async {
loop {
// this future either drives the sending/receiving or times out allowing messages to be
// sent in the timeout
match futures::future::select(
Box::pin(receiver.next_event()),
tokio::time::delay_for(Duration::from_millis(50)),
)
.await
{
futures::future::Either::Left((
Libp2pEvent::Behaviour(BehaviourEvent::RPC(peer_id, event)),
_,
)) => {
match event {
// Should receive sent RPC request
RPCEvent::Request(id, request) => {
if request == rpc_request {
// send the response
warn!(log, "Receiver got request");
message_info = Some((peer_id, id));
} else {
continue;
}
}
_ => continue, // Ignore other events, don't send messages until ready
}
}
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
_ => continue,
}
// if we need to send messages send them here. This will happen after a delay
if message_info.is_some() {
messages_sent += 1;
receiver.swarm.send_rpc(
message_info.as_ref().unwrap().0.clone(),
RPCEvent::Response(
message_info.as_ref().unwrap().1.clone(),
RPCCodedResponse::Success(rpc_response.clone()),
),
);
debug!(log, "Sending message {}", messages_sent);
if messages_sent == messages_to_send + extra_messages_to_send {
// stop sending messages
return;
}
}
}
};
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = delay_for(Duration::from_millis(50000)) => {
panic!("Future timed out");
}
}
}
#[tokio::test] #[tokio::test]
// Tests an empty response to a BlocksByRange RPC Message // Tests an empty response to a BlocksByRange RPC Message
async fn test_blocks_by_range_single_empty_rpc() { async fn test_blocks_by_range_single_empty_rpc() {
@ -375,7 +509,11 @@ async fn test_blocks_by_root_chunked_rpc() {
// BlocksByRoot Request // BlocksByRoot Request
let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest { let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: vec![Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0)], block_roots: vec![
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
],
}); });
// BlocksByRoot Response // BlocksByRoot Response
@ -478,6 +616,152 @@ async fn test_blocks_by_root_chunked_rpc() {
} }
} }
#[tokio::test]
// Tests a streamed, chunked BlocksByRoot RPC Message terminates when all expected reponses have been received
async fn test_blocks_by_root_chunked_rpc_terminates_correctly() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Debug;
let enable_logging = false;
let messages_to_send: u64 = 10;
let extra_messages_to_send: u64 = 10;
let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
// BlocksByRoot Request
let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: vec![
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
],
});
// BlocksByRoot Response
let full_block = BeaconBlock::full(&spec);
let signed_full_block = SignedBeaconBlock {
message: full_block,
signature: Signature::empty_signature(),
};
let rpc_response = RPCResponse::BlocksByRoot(Box::new(signed_full_block));
// keep count of the number of messages received
let mut messages_received = 0;
// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.swarm
.send_rpc(peer_id, RPCEvent::Request(10, rpc_request.clone()));
}
Libp2pEvent::Behaviour(BehaviourEvent::RPC(_, event)) => match event {
// Should receive the RPC response
RPCEvent::Response(id, response) => {
if id == 10 {
debug!(log, "Sender received a response");
match response {
RPCCodedResponse::Success(res) => {
assert_eq!(res, rpc_response.clone());
messages_received += 1;
debug!(log, "Chunk received");
}
RPCCodedResponse::StreamTermination(_) => {
// should be exactly messages_to_send
assert_eq!(messages_received, messages_to_send);
// end the test
return;
}
_ => {} // Ignore other RPC messages
}
}
}
_ => {} // Ignore other RPC messages
},
_ => {} // Ignore other behaviour events
}
}
};
// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
// messages
let mut message_info = None;
// the number of messages we've sent
let mut messages_sent = 0;
let receiver_future = async {
loop {
// this future either drives the sending/receiving or times out allowing messages to be
// sent in the timeout
match futures::future::select(
Box::pin(receiver.next_event()),
tokio::time::delay_for(Duration::from_millis(50)),
)
.await
{
futures::future::Either::Left((
Libp2pEvent::Behaviour(BehaviourEvent::RPC(peer_id, event)),
_,
)) => {
match event {
// Should receive sent RPC request
RPCEvent::Request(id, request) => {
if request == rpc_request {
// send the response
warn!(log, "Receiver got request");
message_info = Some((peer_id, id));
} else {
continue;
}
}
_ => continue, // Ignore other events, don't send messages until ready
}
}
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
_ => continue,
}
// if we need to send messages send them here. This will happen after a delay
if message_info.is_some() {
messages_sent += 1;
receiver.swarm.send_rpc(
message_info.as_ref().unwrap().0.clone(),
RPCEvent::Response(
message_info.as_ref().unwrap().1.clone(),
RPCCodedResponse::Success(rpc_response.clone()),
),
);
debug!(log, "Sending message {}", messages_sent);
if messages_sent == messages_to_send + extra_messages_to_send {
// stop sending messages
return;
}
}
}
};
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = delay_for(Duration::from_millis(1000)) => {
panic!("Future timed out");
}
}
}
#[tokio::test] #[tokio::test]
// Tests a Goodbye RPC message // Tests a Goodbye RPC message
async fn test_goodbye_rpc() { async fn test_goodbye_rpc() {