diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 9005323fb..bb6d3a8ef 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -75,8 +75,16 @@ where /// Map of outbound substreams that need to be driven to completion. The `RequestId` is /// maintained by the application sending the request. - outbound_substreams: - FnvHashMap, delay_queue::Key, Protocol)>, + /// For Responses with multiple expected response chunks a counter is added to be able to terminate the stream when the expected number has been received + outbound_substreams: FnvHashMap< + OutboundRequestId, + ( + OutboundSubstreamState, + delay_queue::Key, + Protocol, + Option, + ), + >, /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. outbound_substreams_delay: DelayQueue, @@ -360,14 +368,19 @@ where .outbound_substreams_delay .insert(id, Duration::from_secs(RESPONSE_TIMEOUT)); let protocol = request.protocol(); + let response_chunk_count = match request { + RPCRequest::BlocksByRange(ref req) => Some(req.count), + RPCRequest::BlocksByRoot(ref req) => Some(req.block_roots.len() as u64), + _ => None, // Other requests do not have a known response chunk length, + }; let awaiting_stream = OutboundSubstreamState::RequestPendingResponse { substream: out, - request, + request: request, }; - if let Some(_) = self - .outbound_substreams - .insert(id, (awaiting_stream, delay_key, protocol)) - { + if let Some(_) = self.outbound_substreams.insert( + id, + (awaiting_stream, delay_key, protocol, response_chunk_count), + ) { crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id)); } } @@ -591,7 +604,7 @@ where loop { match self.outbound_substreams_delay.poll_next_unpin(cx) { Poll::Ready(Some(Ok(stream_id))) => { - if let Some((_id, _stream, protocol)) = + if let Some((_id, _stream, protocol, _)) = self.outbound_substreams.remove(stream_id.get_ref()) { // notify the user @@ -807,18 +820,33 @@ where } => match substream.poll_next_unpin(cx) { Poll::Ready(Some(Ok(response))) => { if request.multiple_responses() && !response.is_error() { - entry.get_mut().0 = - OutboundSubstreamState::RequestPendingResponse { - substream, - request, - }; - let delay_key = &entry.get().1; - self.outbound_substreams_delay - .reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT)); + let substream_entry = entry.get_mut(); + let delay_key = &substream_entry.1; + // chunks left after this one + let remaining_chunks = substream_entry + .3 + .map(|count| count.saturating_sub(1)) + .unwrap_or_else(|| 0); + if remaining_chunks == 0 { + // this is the last expected message, close the stream as all expected chunks have been received + substream_entry.0 = + OutboundSubstreamState::Closing(substream); + } else { + // If the response chunk was expected update the remaining number of chunks expected and reset the Timeout + substream_entry.0 = + OutboundSubstreamState::RequestPendingResponse { + substream, + request, + }; + substream_entry.3 = Some(remaining_chunks); + self.outbound_substreams_delay.reset( + delay_key, + Duration::from_secs(RESPONSE_TIMEOUT), + ); + } } else { // either this is a single response request or we received an // error - //trace!(self.log, "Closing single stream request"); // only expect a single response, close the stream entry.get_mut().0 = OutboundSubstreamState::Closing(substream); } @@ -875,13 +903,14 @@ where }, OutboundSubstreamState::Closing(mut substream) => { match Sink::poll_close(Pin::new(&mut substream), cx) { - // TODO: check if this is supposed to be a stream Poll::Ready(_) => { - // drop the stream - including if there is an error + // drop the stream and its corresponding timeout let delay_key = &entry.get().1; + let protocol = entry.get().2; self.outbound_substreams_delay.remove(delay_key); entry.remove_entry(); + // adjust the RPC keep-alive if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() { @@ -889,6 +918,36 @@ where Instant::now() + self.inactive_timeout, ); } + + // report the stream termination to the user + // + // Streams can be terminated here if a responder tries to + // continue sending responses beyond what we would expect. Here + // we simply terminate the stream and report a stream + // termination to the application + match protocol { + Protocol::BlocksByRange => { + return Poll::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Response( + request_id, + RPCCodedResponse::StreamTermination( + ResponseTermination::BlocksByRange, + ), + ), + )); + } + Protocol::BlocksByRoot => { + return Poll::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Response( + request_id, + RPCCodedResponse::StreamTermination( + ResponseTermination::BlocksByRoot, + ), + ), + )); + } + _ => {} // all other protocols are do not have multiple responses and we do not inform the user, we simply drop the stream. + } } Poll::Pending => { entry.get_mut().0 = OutboundSubstreamState::Closing(substream); diff --git a/beacon_node/eth2-libp2p/tests/rpc_tests.rs b/beacon_node/eth2-libp2p/tests/rpc_tests.rs index db74e75b8..847be0884 100644 --- a/beacon_node/eth2-libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2-libp2p/tests/rpc_tests.rs @@ -235,6 +235,140 @@ async fn test_blocks_by_range_chunked_rpc() { } } +#[tokio::test] +// Tests that a streamed BlocksByRange RPC Message terminates when all expected chunks were received +async fn test_blocks_by_range_chunked_rpc_terminates_correctly() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Debug; + let enable_logging = false; + + let messages_to_send = 10; + let extra_messages_to_send = 10; + + let log = common::build_log(log_level, enable_logging); + + // get sender/receiver + let (mut sender, mut receiver) = common::build_node_pair(&log).await; + + // BlocksByRange Request + let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest { + start_slot: 0, + count: messages_to_send, + step: 0, + }); + + // BlocksByRange Response + let spec = E::default_spec(); + let empty_block = BeaconBlock::empty(&spec); + let empty_signed = SignedBeaconBlock { + message: empty_block, + signature: Signature::empty_signature(), + }; + let rpc_response = RPCResponse::BlocksByRange(Box::new(empty_signed)); + + // keep count of the number of messages received + let mut messages_received: u64 = 0; + // build the sender future + let sender_future = async { + loop { + match sender.next_event().await { + Libp2pEvent::PeerConnected { peer_id, .. } => { + // Send a STATUS message + debug!(log, "Sending RPC"); + sender + .swarm + .send_rpc(peer_id, RPCEvent::Request(10, rpc_request.clone())); + } + Libp2pEvent::Behaviour(BehaviourEvent::RPC(_, event)) => match event { + // Should receive the RPC response + RPCEvent::Response(id, response) => { + if id == 10 { + debug!(log, "Sender received a response"); + match response { + RPCCodedResponse::Success(res) => { + assert_eq!(res, rpc_response.clone()); + messages_received += 1; + } + RPCCodedResponse::StreamTermination(_) => { + // should be exactly 10 messages, as requested + assert_eq!(messages_received, messages_to_send); + } + _ => panic!("Invalid RPC received"), + } + } + } + _ => {} // Ignore other RPC messages + }, + _ => {} // Ignore other behaviour events + } + } + }; + + // determine messages to send (PeerId, RequestId). If some, indicates we still need to send + // messages + let mut message_info = None; + // the number of messages we've sent + let mut messages_sent = 0; + let receiver_future = async { + loop { + // this future either drives the sending/receiving or times out allowing messages to be + // sent in the timeout + match futures::future::select( + Box::pin(receiver.next_event()), + tokio::time::delay_for(Duration::from_millis(50)), + ) + .await + { + futures::future::Either::Left(( + Libp2pEvent::Behaviour(BehaviourEvent::RPC(peer_id, event)), + _, + )) => { + match event { + // Should receive sent RPC request + RPCEvent::Request(id, request) => { + if request == rpc_request { + // send the response + warn!(log, "Receiver got request"); + message_info = Some((peer_id, id)); + } else { + continue; + } + } + _ => continue, // Ignore other events, don't send messages until ready + } + } + futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required + _ => continue, + } + + // if we need to send messages send them here. This will happen after a delay + if message_info.is_some() { + messages_sent += 1; + receiver.swarm.send_rpc( + message_info.as_ref().unwrap().0.clone(), + RPCEvent::Response( + message_info.as_ref().unwrap().1.clone(), + RPCCodedResponse::Success(rpc_response.clone()), + ), + ); + debug!(log, "Sending message {}", messages_sent); + if messages_sent == messages_to_send + extra_messages_to_send { + // stop sending messages + return; + } + } + } + }; + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = delay_for(Duration::from_millis(50000)) => { + panic!("Future timed out"); + } + } +} + #[tokio::test] // Tests an empty response to a BlocksByRange RPC Message async fn test_blocks_by_range_single_empty_rpc() { @@ -375,7 +509,11 @@ async fn test_blocks_by_root_chunked_rpc() { // BlocksByRoot Request let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest { - block_roots: vec![Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0)], + block_roots: vec![ + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + ], }); // BlocksByRoot Response @@ -478,6 +616,152 @@ async fn test_blocks_by_root_chunked_rpc() { } } +#[tokio::test] +// Tests a streamed, chunked BlocksByRoot RPC Message terminates when all expected reponses have been received +async fn test_blocks_by_root_chunked_rpc_terminates_correctly() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Debug; + let enable_logging = false; + + let messages_to_send: u64 = 10; + let extra_messages_to_send: u64 = 10; + + let log = common::build_log(log_level, enable_logging); + let spec = E::default_spec(); + + // get sender/receiver + let (mut sender, mut receiver) = common::build_node_pair(&log).await; + + // BlocksByRoot Request + let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest { + block_roots: vec![ + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + Hash256::from_low_u64_be(0), + ], + }); + + // BlocksByRoot Response + let full_block = BeaconBlock::full(&spec); + let signed_full_block = SignedBeaconBlock { + message: full_block, + signature: Signature::empty_signature(), + }; + let rpc_response = RPCResponse::BlocksByRoot(Box::new(signed_full_block)); + + // keep count of the number of messages received + let mut messages_received = 0; + // build the sender future + let sender_future = async { + loop { + match sender.next_event().await { + Libp2pEvent::PeerConnected { peer_id, .. } => { + // Send a STATUS message + debug!(log, "Sending RPC"); + sender + .swarm + .send_rpc(peer_id, RPCEvent::Request(10, rpc_request.clone())); + } + Libp2pEvent::Behaviour(BehaviourEvent::RPC(_, event)) => match event { + // Should receive the RPC response + RPCEvent::Response(id, response) => { + if id == 10 { + debug!(log, "Sender received a response"); + match response { + RPCCodedResponse::Success(res) => { + assert_eq!(res, rpc_response.clone()); + messages_received += 1; + debug!(log, "Chunk received"); + } + RPCCodedResponse::StreamTermination(_) => { + // should be exactly messages_to_send + assert_eq!(messages_received, messages_to_send); + // end the test + return; + } + _ => {} // Ignore other RPC messages + } + } + } + _ => {} // Ignore other RPC messages + }, + _ => {} // Ignore other behaviour events + } + } + }; + + // determine messages to send (PeerId, RequestId). If some, indicates we still need to send + // messages + let mut message_info = None; + // the number of messages we've sent + let mut messages_sent = 0; + let receiver_future = async { + loop { + // this future either drives the sending/receiving or times out allowing messages to be + // sent in the timeout + match futures::future::select( + Box::pin(receiver.next_event()), + tokio::time::delay_for(Duration::from_millis(50)), + ) + .await + { + futures::future::Either::Left(( + Libp2pEvent::Behaviour(BehaviourEvent::RPC(peer_id, event)), + _, + )) => { + match event { + // Should receive sent RPC request + RPCEvent::Request(id, request) => { + if request == rpc_request { + // send the response + warn!(log, "Receiver got request"); + message_info = Some((peer_id, id)); + } else { + continue; + } + } + _ => continue, // Ignore other events, don't send messages until ready + } + } + futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required + _ => continue, + } + + // if we need to send messages send them here. This will happen after a delay + if message_info.is_some() { + messages_sent += 1; + receiver.swarm.send_rpc( + message_info.as_ref().unwrap().0.clone(), + RPCEvent::Response( + message_info.as_ref().unwrap().1.clone(), + RPCCodedResponse::Success(rpc_response.clone()), + ), + ); + debug!(log, "Sending message {}", messages_sent); + if messages_sent == messages_to_send + extra_messages_to_send { + // stop sending messages + return; + } + } + } + }; + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = delay_for(Duration::from_millis(1000)) => { + panic!("Future timed out"); + } + } +} + #[tokio::test] // Tests a Goodbye RPC message async fn test_goodbye_rpc() {