Improved rpc protocols handler. WIP
This commit is contained in:
		
							parent
							
								
									f1127e4e0d
								
							
						
					
					
						commit
						bb0e28b8e3
					
				| @ -1,4 +1,3 @@ | ||||
| 
 | ||||
| use libp2p::core::protocols_handler::{ | ||||
|     KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, | ||||
|     SubstreamProtocol | ||||
| @ -14,7 +13,7 @@ use wasm_timer::Instant; | ||||
| pub const RESPONSE_TIMEOUT: u64 = 9; | ||||
| 
 | ||||
| /// Implementation of `ProtocolsHandler` for the RPC protocol.
 | ||||
| pub struct RPCHandler<TSubstream, TSocket> | ||||
| pub struct RPCHandler<TSubstream> { | ||||
| 
 | ||||
|     /// The upgrade for inbound substreams.
 | ||||
|     listen_protocol: SubstreamProtocol<RPCProtocol>, | ||||
| @ -26,13 +25,13 @@ pub struct RPCHandler<TSubstream, TSocket> | ||||
|     events_out: SmallVec<[TOutEvent; 4]>, | ||||
| 
 | ||||
|     /// Queue of outbound substreams to open.
 | ||||
|     dial_queue: SmallVec<[TOutProto; 4]>, | ||||
|     dial_queue: SmallVec<[(usize,TOutProto); 4]>, | ||||
| 
 | ||||
|     /// Current number of concurrent outbound substreams being opened.
 | ||||
|     dial_negotiated: u32, | ||||
| 
 | ||||
|     /// Map of current substreams awaiting a response to an RPC request.
 | ||||
|     waiting_substreams: FnvHashMap<u64, WaitingStream<TSubstream> | ||||
|     waiting_substreams: FnvHashMap<u64, SubstreamState<TSubstream> | ||||
| 
 | ||||
|     /// Sequential Id for waiting substreams.
 | ||||
|     current_substream_id: usize, | ||||
| @ -47,9 +46,15 @@ pub struct RPCHandler<TSubstream, TSocket> | ||||
|     inactive_timeout: Duration, | ||||
| } | ||||
| 
 | ||||
| struct WaitingStream<TSubstream> { | ||||
|     stream: TSubstream, | ||||
|     timeout: Duration, | ||||
| /// State of an outbound substream. Either waiting for a response, or in the process of sending.
 | ||||
| pub enum SubstreamState<TSubstream> { | ||||
|     /// An outbound substream is waiting a response from the user.
 | ||||
|     WaitingResponse { | ||||
|         stream: <TSubstream>, | ||||
|         timeout: Duration, | ||||
|     } | ||||
|     /// A response has been sent and we are waiting for the stream to close.
 | ||||
|     ResponseSent(WriteOne<TSubstream, Vec<u8>) | ||||
| } | ||||
| 
 | ||||
| impl<TSubstream> | ||||
| @ -96,9 +101,9 @@ impl<TSubstream> | ||||
| 
 | ||||
|     /// Opens an outbound substream with `upgrade`.
 | ||||
|     #[inline] | ||||
|     pub fn send_request(&mut self, upgrade: RPCRequest) { | ||||
|     pub fn send_request(&mut self, request_id, u64, upgrade: RPCRequest) { | ||||
|         self.keep_alive = KeepAlive::Yes; | ||||
|         self.dial_queue.push(upgrade); | ||||
|         self.dial_queue.push((request_id, upgrade)); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| @ -106,20 +111,20 @@ impl<TSubstream> Default | ||||
|     for RPCHandler<TSubstream> | ||||
| { | ||||
|     fn default() -> Self { | ||||
|         RPCHandler::new(SubstreamProtocol::new(RPCProtocol), Duration::from_secs(10)) | ||||
|         RPCHandler::new(SubstreamProtocol::new(RPCProtocol), Duration::from_secs(30)) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<TSubstream> ProtocolsHandler | ||||
|     for RPCHandler<TSubstream> | ||||
| { | ||||
|     type InEvent = RPCRequest; | ||||
|     type InEvent = RPCEvent; | ||||
|     type OutEvent = RPCEvent; | ||||
|     type Error = ProtocolsHandlerUpgrErr<RPCRequest::Error>; | ||||
|     type Substream = TSubstream; | ||||
|     type InboundProtocol = RPCProtocol; | ||||
|     type OutboundProtocol = RPCRequest; | ||||
|     type OutboundOpenInfo = (); | ||||
|     type OutboundOpenInfo = u64; // request_id
 | ||||
| 
 | ||||
|     #[inline] | ||||
|     fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> { | ||||
| @ -131,10 +136,6 @@ impl<TSubstream> ProtocolsHandler | ||||
|         &mut self, | ||||
|         out: RPCProtocol::Output, | ||||
|     ) { | ||||
|         if !self.keep_alive.is_yes() { | ||||
|             self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); | ||||
|         } | ||||
| 
 | ||||
|        let (stream, req) = out; 
 | ||||
|        // drop the stream and return a 0 id for goodbye "requests"
 | ||||
|        if let req @ RPCRequest::Goodbye(_) = req { | ||||
| @ -143,7 +144,7 @@ impl<TSubstream> ProtocolsHandler | ||||
|        } | ||||
| 
 | ||||
|         // New inbound request. Store the stream and tag the output.
 | ||||
|         let awaiting_stream = WaitingStream { stream, timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT) }; | ||||
|         let awaiting_stream = SubstreamState::WaitingResponse { stream, timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT) }; | ||||
|         self.waiting_substreams.insert(self.current_substream_id, awaiting_stream); | ||||
| 
 | ||||
|         self.events_out.push(RPCEvent::Request(self.current_substream_id, req)); | ||||
| @ -154,20 +155,36 @@ impl<TSubstream> ProtocolsHandler | ||||
|     fn inject_fully_negotiated_outbound( | ||||
|         &mut self, | ||||
|         out: RPCResponse, | ||||
|         _: Self::OutboundOpenInfo, | ||||
|         request_id : Self::OutboundOpenInfo, | ||||
|     ) { | ||||
|         self.dial_negotiated -= 1; | ||||
| 
 | ||||
|         if self.dial_negotiated == 0 && self.dial_queue.is_empty() { | ||||
|         if self.dial_negotiated == 0 && self.dial_queue.is_empty() && self.waiting_substreams.is_empty() { | ||||
|             self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); | ||||
|         } | ||||
|         else  { | ||||
|             self.keep_alive = KeepAlive::Yes; | ||||
|         } | ||||
| 
 | ||||
|         self.events_out.push(out.into()); | ||||
|         self.events_out.push(RPCEvent::Response(request_id, out)); | ||||
|     } | ||||
| 
 | ||||
|     // Note: If the substream has closed due to inactivity, or the substream is in the
 | ||||
|     // wrong state a response will fail silently.
 | ||||
|     #[inline] | ||||
|     fn inject_event(&mut self, event: Self::InEvent) { | ||||
|         self.send_request(event); | ||||
|     fn inject_event(&mut self, rpc_event: Self::InEvent) { | ||||
|         match rpc_event { | ||||
|             RPCEvent::Request(rpc_id, req) => self.send_request(rpc_id, req), | ||||
|             RPCEvent::Response(rpc_id, res) => { | ||||
|                 // check if the stream matching the response still exists
 | ||||
|                 if let Some(mut waiting_stream) = self.waiting_substreams.get_mut(&rpc_id) { | ||||
|                         // only send one response per stream. This must be in the waiting state.
 | ||||
|                     if let SubstreamState::WaitingResponse {substream, .. } = waiting_stream { | ||||
|                     waiting_stream = SubstreamState::PendingWrite(upgrade::write_one(substream, res)); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     #[inline] | ||||
| @ -198,6 +215,26 @@ impl<TSubstream> ProtocolsHandler | ||||
|             return Err(err); | ||||
|         } | ||||
| 
 | ||||
|         // prioritise sending responses for waiting substreams
 | ||||
|         self.waiting_substreams.retain(|_k, mut waiting_stream| { | ||||
|             match waiting_stream  => { | ||||
|                 SubstreamState::PendingWrite(write_one) => { | ||||
|                     match write_one.poll() => { | ||||
|                         Ok(Async::Ready(_socket)) => false, | ||||
|                         Ok(Async::NotReady()) => true, | ||||
|                         Err(_e) => { 
 | ||||
|                             //TODO: Add logging
 | ||||
|                             // throw away streams that error
 | ||||
|                             false 
 | ||||
|                          } | ||||
|                     } | ||||
|                 }, | ||||
|                 SubstreamState::WaitingResponse { timeout, .. } => { 
 | ||||
|                     if Instant::now() > timeout { false} else { true } | ||||
|                 } | ||||
|             } | ||||
|         }); | ||||
| 
 | ||||
|         if !self.events_out.is_empty() { | ||||
|             return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( | ||||
|                 self.events_out.remove(0), | ||||
| @ -206,20 +243,21 @@ impl<TSubstream> ProtocolsHandler | ||||
|             self.events_out.shrink_to_fit(); | ||||
|         } | ||||
| 
 | ||||
|         // establish outbound substreams
 | ||||
|         if !self.dial_queue.is_empty() { | ||||
|             if self.dial_negotiated < self.max_dial_negotiated { | ||||
|                 self.dial_negotiated += 1; | ||||
|                 let (request_id, req) = self.dial_queue.remove(0); | ||||
|                 return Ok(Async::Ready( | ||||
|                     ProtocolsHandlerEvent::OutboundSubstreamRequest { | ||||
|                         protocol: SubstreamProtocol::new(self.dial_queue.remove(0)), | ||||
|                         info: (), | ||||
|                         protocol: SubstreamProtocol::new(req), | ||||
|                         info: request_id, | ||||
|                     }, | ||||
|                 )); | ||||
|             } | ||||
|         } else { | ||||
|             self.dial_queue.shrink_to_fit(); | ||||
|         } | ||||
| 
 | ||||
|         Ok(Async::NotReady) | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -7,6 +7,7 @@ use types::{Epoch, Hash256, Slot}; | ||||
| #[derive(Debug, Clone)] | ||||
| pub enum RPCResponse { | ||||
|     Hello(HelloMessage), | ||||
|     Goodbye, // empty value - required for protocol handler
 | ||||
|     BeaconBlockRoots(BeaconBlockRootsResponse), | ||||
|     BeaconBlockHeaders(BeaconBlockHeadersResponse), | ||||
|     BeaconBlockBodies(BeaconBlockBodiesResponse), | ||||
|  | ||||
| @ -1,6 +1,6 @@ | ||||
| /// The Ethereum 2.0 Wire Protocol
 | ||||
| ///
 | ||||
| /// This protocol is a purpose built ethereum 2.0 libp2p protocol. It's role is to facilitate
 | ||||
| /// This protocol is a purpose built Ethereum 2.0 libp2p protocol. It's role is to facilitate
 | ||||
| /// direct peer-to-peer communication primarily for sending/receiving chain information for
 | ||||
| /// syncing.
 | ||||
| ///
 | ||||
| @ -67,7 +67,7 @@ impl<TSubstream> NetworkBehaviour for Rpc<TSubstream> | ||||
| where | ||||
|     TSubstream: AsyncRead + AsyncWrite, | ||||
| { | ||||
|     type ProtocolsHandler = OneShotHandler<TSubstream, RPCProtocol, RPCEvent, HandlerEvent>; | ||||
|     type ProtocolsHandler = RPCHandler<TSubstream>; | ||||
|     type OutEvent = RPCMessage; | ||||
| 
 | ||||
|     fn new_handler(&mut self) -> Self::ProtocolsHandler { | ||||
|  | ||||
| @ -10,6 +10,8 @@ use tokio::io::{AsyncRead, AsyncWrite}; | ||||
| const MAX_RPC_SIZE: usize = 4_194_304; // 4M
 | ||||
| /// The protocol prefix the RPC protocol id.
 | ||||
| const PROTOCOL_PREFIX: &str = "/eth/serenity/rpc/"; | ||||
| /// The number of seconds to wait for a response before the stream is terminated.
 | ||||
| const RESPONSE_TIMEOUT: u64 = 10; | ||||
| 
 | ||||
| /// Implementation of the `ConnectionUpgrade` for the rpc protocol.
 | ||||
| #[derive(Debug, Clone)] | ||||
| @ -58,6 +60,7 @@ where | ||||
|         upgrade::read_respond(socket, MAX_RPC_SIZE, (), |socket, packet, ()| { | ||||
|             Ok((socket, decode_request(packet, protocol)?)) | ||||
|         }) | ||||
|         .timeout(Duration::from_secs(RESPONSE_TIMEOUT)) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| @ -135,17 +138,6 @@ impl UpgradeInfo for RPCRequest { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| //  GOODBYE RPC has it's own upgrade as it doesn't expect a response
 | ||||
| impl UpgradeInfo for Goodbye { | ||||
|     type Info = RawProtocolId; | ||||
|     type InfoIter = iter::Once<Self::Info>; | ||||
| 
 | ||||
|     // add further protocols as we support more encodings/versions
 | ||||
|     fn protocol_info(&self) -> Self::InfoIter { | ||||
|         iter::once(ProtocolId::new("goodbye", 1, "ssz").into()) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| /// Implements the encoding per supported protocol for RPCRequest.
 | ||||
| impl RPCRequest { | ||||
|     pub fn supported_protocols(&self) -> Vec<RawProtocolId> { | ||||
| @ -221,24 +213,7 @@ where | ||||
|         upgrade::request_response(socket, bytes, MAX_RPC_SIZE, protocol, |packet, protocol| { | ||||
|             Ok(decode_response(packet, protocol)?) | ||||
|         }) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<TSocket> OutboundUpgrade<TSocket> for Goodbye | ||||
| where | ||||
|     TSocket: AsyncWrite, | ||||
| { | ||||
|     type Output = (); | ||||
|     type Error = io::Error; | ||||
|     type Future = upgrade::WriteOne<upgrade::Negotiated<TSocket>>; | ||||
| 
 | ||||
|     fn upgrade_outbound( | ||||
|         self, | ||||
|         socket: upgrade::Negotiated<TSocket>, | ||||
|         protocol: Self::Info, | ||||
|     ) -> Self::Future { | ||||
|         let bytes = self.as_ssz_bytes(); | ||||
|         upgrade::write_one(socket, bytes) | ||||
|         .timeout(Duration::from_secs(RESPONSE_TIMEOUT)) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user