Mitigate too many outgoing connections (#1469)
limit simultaneous outgoing connections attempts to a reasonable top as an extra layer of protection also shift the keep alive logic of the rpc handler to avoid needing to update it by hand. I think In rare cases this could make shutting down a connection a bit faster.
This commit is contained in:
parent
ec84183e05
commit
1a67d15701
@ -116,9 +116,6 @@ where
|
|||||||
/// Maximum number of concurrent outbound substreams being opened. Value is never modified.
|
/// Maximum number of concurrent outbound substreams being opened. Value is never modified.
|
||||||
max_dial_negotiated: u32,
|
max_dial_negotiated: u32,
|
||||||
|
|
||||||
/// Value to return from `connection_keep_alive`.
|
|
||||||
keep_alive: KeepAlive,
|
|
||||||
|
|
||||||
/// State of the handler.
|
/// State of the handler.
|
||||||
state: HandlerState,
|
state: HandlerState,
|
||||||
|
|
||||||
@ -243,7 +240,6 @@ where
|
|||||||
current_outbound_substream_id: SubstreamId(0),
|
current_outbound_substream_id: SubstreamId(0),
|
||||||
state: HandlerState::Active,
|
state: HandlerState::Active,
|
||||||
max_dial_negotiated: 8,
|
max_dial_negotiated: 8,
|
||||||
keep_alive: KeepAlive::Yes,
|
|
||||||
outbound_io_error_retries: 0,
|
outbound_io_error_retries: 0,
|
||||||
log: log.clone(),
|
log: log.clone(),
|
||||||
}
|
}
|
||||||
@ -287,7 +283,6 @@ where
|
|||||||
TInstant::now() + Duration::from_secs(SHUTDOWN_TIMEOUT_SECS as u64),
|
TInstant::now() + Duration::from_secs(SHUTDOWN_TIMEOUT_SECS as u64),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
self.update_keep_alive();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Opens an outbound substream with a request.
|
/// Opens an outbound substream with a request.
|
||||||
@ -295,7 +290,6 @@ where
|
|||||||
match self.state {
|
match self.state {
|
||||||
HandlerState::Active => {
|
HandlerState::Active => {
|
||||||
self.dial_queue.push((id, req));
|
self.dial_queue.push((id, req));
|
||||||
self.update_keep_alive();
|
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
self.pending_errors.push(HandlerErr::Outbound {
|
self.pending_errors.push(HandlerErr::Outbound {
|
||||||
@ -338,43 +332,6 @@ where
|
|||||||
}
|
}
|
||||||
inbound_info.pending_items.push(response);
|
inbound_info.pending_items.push(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Updates the `KeepAlive` returned by `connection_keep_alive`.
|
|
||||||
///
|
|
||||||
/// The handler stays alive as long as there are inbound/outbound substreams established and no
|
|
||||||
/// items dialing/to be dialed. Otherwise it is given a grace period of inactivity of
|
|
||||||
/// `self.inactive_timeout`.
|
|
||||||
fn update_keep_alive(&mut self) {
|
|
||||||
// Check that we don't have outbound items pending for dialing, nor dialing, nor
|
|
||||||
// established. Also check that there are no established inbound substreams.
|
|
||||||
// Errors and events need to be reported back, so check those too.
|
|
||||||
let should_shutdown = match self.state {
|
|
||||||
HandlerState::ShuttingDown(_) => {
|
|
||||||
self.dial_queue.is_empty()
|
|
||||||
&& self.outbound_substreams.is_empty()
|
|
||||||
&& self.inbound_substreams.is_empty()
|
|
||||||
&& self.pending_errors.is_empty()
|
|
||||||
&& self.events_out.is_empty()
|
|
||||||
&& self.dial_negotiated == 0
|
|
||||||
}
|
|
||||||
HandlerState::Deactivated => {
|
|
||||||
// Regardless of events, the timeout has expired. Force the disconnect.
|
|
||||||
true
|
|
||||||
}
|
|
||||||
_ => false,
|
|
||||||
};
|
|
||||||
|
|
||||||
match self.keep_alive {
|
|
||||||
KeepAlive::Yes if should_shutdown => self.keep_alive = KeepAlive::No,
|
|
||||||
KeepAlive::Yes => {} // We continue being active
|
|
||||||
KeepAlive::Until(_) if should_shutdown => self.keep_alive = KeepAlive::No, // Already deemed inactive
|
|
||||||
KeepAlive::Until(_) => {
|
|
||||||
// No longer idle
|
|
||||||
self.keep_alive = KeepAlive::Yes;
|
|
||||||
}
|
|
||||||
KeepAlive::No => {} // currently not used
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TSpec> ProtocolsHandler for RPCHandler<TSpec>
|
impl<TSpec> ProtocolsHandler for RPCHandler<TSpec>
|
||||||
@ -427,8 +384,6 @@ where
|
|||||||
self.events_out
|
self.events_out
|
||||||
.push(RPCReceived::Request(self.current_inbound_substream_id, req));
|
.push(RPCReceived::Request(self.current_inbound_substream_id, req));
|
||||||
self.current_inbound_substream_id.0 += 1;
|
self.current_inbound_substream_id.0 += 1;
|
||||||
|
|
||||||
self.update_keep_alive();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_fully_negotiated_outbound(
|
fn inject_fully_negotiated_outbound(
|
||||||
@ -486,8 +441,6 @@ where
|
|||||||
}
|
}
|
||||||
self.current_outbound_substream_id.0 += 1;
|
self.current_outbound_substream_id.0 += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
self.update_keep_alive();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_event(&mut self, rpc_event: Self::InEvent) {
|
fn inject_event(&mut self, rpc_event: Self::InEvent) {
|
||||||
@ -515,7 +468,6 @@ where
|
|||||||
|
|
||||||
// This dialing is now considered failed
|
// This dialing is now considered failed
|
||||||
self.dial_negotiated -= 1;
|
self.dial_negotiated -= 1;
|
||||||
self.update_keep_alive();
|
|
||||||
|
|
||||||
self.outbound_io_error_retries = 0;
|
self.outbound_io_error_retries = 0;
|
||||||
// map the error
|
// map the error
|
||||||
@ -548,7 +500,29 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn connection_keep_alive(&self) -> KeepAlive {
|
fn connection_keep_alive(&self) -> KeepAlive {
|
||||||
self.keep_alive
|
// Check that we don't have outbound items pending for dialing, nor dialing, nor
|
||||||
|
// established. Also check that there are no established inbound substreams.
|
||||||
|
// Errors and events need to be reported back, so check those too.
|
||||||
|
let should_shutdown = match self.state {
|
||||||
|
HandlerState::ShuttingDown(_) => {
|
||||||
|
self.dial_queue.is_empty()
|
||||||
|
&& self.outbound_substreams.is_empty()
|
||||||
|
&& self.inbound_substreams.is_empty()
|
||||||
|
&& self.pending_errors.is_empty()
|
||||||
|
&& self.events_out.is_empty()
|
||||||
|
&& self.dial_negotiated == 0
|
||||||
|
}
|
||||||
|
HandlerState::Deactivated => {
|
||||||
|
// Regardless of events, the timeout has expired. Force the disconnect.
|
||||||
|
true
|
||||||
|
}
|
||||||
|
_ => false,
|
||||||
|
};
|
||||||
|
if should_shutdown {
|
||||||
|
KeepAlive::No
|
||||||
|
} else {
|
||||||
|
KeepAlive::Yes
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll(
|
fn poll(
|
||||||
@ -624,8 +598,6 @@ where
|
|||||||
if let Some(OutboundInfo { proto, req_id, .. }) =
|
if let Some(OutboundInfo { proto, req_id, .. }) =
|
||||||
self.outbound_substreams.remove(outbound_id.get_ref())
|
self.outbound_substreams.remove(outbound_id.get_ref())
|
||||||
{
|
{
|
||||||
self.update_keep_alive();
|
|
||||||
|
|
||||||
let outbound_err = HandlerErr::Outbound {
|
let outbound_err = HandlerErr::Outbound {
|
||||||
id: req_id,
|
id: req_id,
|
||||||
proto,
|
proto,
|
||||||
@ -724,7 +696,6 @@ where
|
|||||||
self.inbound_substreams.remove(&inbound_id);
|
self.inbound_substreams.remove(&inbound_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.update_keep_alive();
|
|
||||||
// drive outbound streams that need to be processed
|
// drive outbound streams that need to be processed
|
||||||
for outbound_id in self.outbound_substreams.keys().copied().collect::<Vec<_>>() {
|
for outbound_id in self.outbound_substreams.keys().copied().collect::<Vec<_>>() {
|
||||||
// get the state and mark it as poisoned
|
// get the state and mark it as poisoned
|
||||||
@ -813,7 +784,6 @@ where
|
|||||||
let request_id = entry.get().req_id;
|
let request_id = entry.get().req_id;
|
||||||
self.outbound_substreams_delay.remove(delay_key);
|
self.outbound_substreams_delay.remove(delay_key);
|
||||||
entry.remove_entry();
|
entry.remove_entry();
|
||||||
self.update_keep_alive();
|
|
||||||
// notify the application error
|
// notify the application error
|
||||||
if request.expected_responses() > 1 {
|
if request.expected_responses() > 1 {
|
||||||
// return an end of stream result
|
// return an end of stream result
|
||||||
@ -844,7 +814,6 @@ where
|
|||||||
error: e,
|
error: e,
|
||||||
};
|
};
|
||||||
entry.remove_entry();
|
entry.remove_entry();
|
||||||
self.update_keep_alive();
|
|
||||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -857,7 +826,6 @@ where
|
|||||||
let request_id = entry.get().req_id;
|
let request_id = entry.get().req_id;
|
||||||
self.outbound_substreams_delay.remove(delay_key);
|
self.outbound_substreams_delay.remove(delay_key);
|
||||||
entry.remove_entry();
|
entry.remove_entry();
|
||||||
self.update_keep_alive();
|
|
||||||
|
|
||||||
// report the stream termination to the user
|
// report the stream termination to the user
|
||||||
//
|
//
|
||||||
@ -894,7 +862,6 @@ where
|
|||||||
self.dial_negotiated += 1;
|
self.dial_negotiated += 1;
|
||||||
let (id, req) = self.dial_queue.remove(0);
|
let (id, req) = self.dial_queue.remove(0);
|
||||||
self.dial_queue.shrink_to_fit();
|
self.dial_queue.shrink_to_fit();
|
||||||
self.update_keep_alive();
|
|
||||||
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||||
protocol: SubstreamProtocol::new(req.clone()),
|
protocol: SubstreamProtocol::new(req.clone()),
|
||||||
info: (id, req),
|
info: (id, req),
|
||||||
|
@ -102,6 +102,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
|||||||
.notify_handler_buffer_size(std::num::NonZeroUsize::new(32).expect("Not zero"))
|
.notify_handler_buffer_size(std::num::NonZeroUsize::new(32).expect("Not zero"))
|
||||||
.connection_event_buffer_size(64)
|
.connection_event_buffer_size(64)
|
||||||
.incoming_connection_limit(10)
|
.incoming_connection_limit(10)
|
||||||
|
.outgoing_connection_limit(config.target_peers * 2)
|
||||||
.peer_connection_limit(MAX_CONNECTIONS_PER_PEER)
|
.peer_connection_limit(MAX_CONNECTIONS_PER_PEER)
|
||||||
.executor(Box::new(Executor(executor)))
|
.executor(Box::new(Executor(executor)))
|
||||||
.build()
|
.build()
|
||||||
|
Loading…
Reference in New Issue
Block a user