Don't kill SSE stream if channel fills up (#4500)

## Issue Addressed

Closes #4245

## Proposed Changes

- If an SSE channel fills up, send a comment instead of terminating the stream.
- Add a CLI flag for scaling up the SSE buffer: `--http-sse-capacity-multiplier N`.

## Additional Info

~~Blocked on #4462. I haven't rebased on that PR yet for initial testing, because it still needs some more work to handle long-running HTTP threads.~~

- [x] Add CLI flag tests.
This commit is contained in:
Michael Sproul 2023-08-17 02:37:29 +00:00
parent 59c24bcd2d
commit 7251a93c5e
7 changed files with 67 additions and 21 deletions

View File

@ -21,8 +21,11 @@ pub struct ServerSentEventHandler<T: EthSpec> {
} }
impl<T: EthSpec> ServerSentEventHandler<T> { impl<T: EthSpec> ServerSentEventHandler<T> {
pub fn new(log: Logger) -> Self { pub fn new(log: Logger, capacity_multiplier: usize) -> Self {
Self::new_with_capacity(log, DEFAULT_CHANNEL_CAPACITY) Self::new_with_capacity(
log,
capacity_multiplier.saturating_mul(DEFAULT_CHANNEL_CAPACITY),
)
} }
pub fn new_with_capacity(log: Logger, capacity: usize) -> Self { pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {

View File

@ -157,7 +157,10 @@ where
let context = runtime_context.service_context("beacon".into()); let context = runtime_context.service_context("beacon".into());
let spec = chain_spec.ok_or("beacon_chain_start_method requires a chain spec")?; let spec = chain_spec.ok_or("beacon_chain_start_method requires a chain spec")?;
let event_handler = if self.http_api_config.enabled { let event_handler = if self.http_api_config.enabled {
Some(ServerSentEventHandler::new(context.log().clone())) Some(ServerSentEventHandler::new(
context.log().clone(),
self.http_api_config.sse_capacity_multiplier,
))
} else { } else {
None None
}; };

View File

@ -65,7 +65,10 @@ use tokio::sync::{
mpsc::{Sender, UnboundedSender}, mpsc::{Sender, UnboundedSender},
oneshot, oneshot,
}; };
use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamExt,
};
use types::{ use types::{
Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError,
BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload,
@ -132,6 +135,7 @@ pub struct Config {
pub allow_sync_stalled: bool, pub allow_sync_stalled: bool,
pub spec_fork_name: Option<ForkName>, pub spec_fork_name: Option<ForkName>,
pub data_dir: PathBuf, pub data_dir: PathBuf,
pub sse_capacity_multiplier: usize,
pub enable_beacon_processor: bool, pub enable_beacon_processor: bool,
} }
@ -146,6 +150,7 @@ impl Default for Config {
allow_sync_stalled: false, allow_sync_stalled: false,
spec_fork_name: None, spec_fork_name: None,
data_dir: PathBuf::from(DEFAULT_ROOT_DIR), data_dir: PathBuf::from(DEFAULT_ROOT_DIR),
sse_capacity_multiplier: 1,
enable_beacon_processor: true, enable_beacon_processor: true,
} }
} }
@ -4348,22 +4353,29 @@ pub fn serve<T: BeaconChainTypes>(
} }
}; };
receivers.push(BroadcastStream::new(receiver).map(|msg| { receivers.push(
BroadcastStream::new(receiver)
.map(|msg| {
match msg { match msg {
Ok(data) => Event::default() Ok(data) => Event::default()
.event(data.topic_name()) .event(data.topic_name())
.json_data(data) .json_data(data)
.map_err(|e| { .unwrap_or_else(|e| {
warp_utils::reject::server_sent_event_error(format!( Event::default()
"{:?}", .comment(format!("error - bad json: {e:?}"))
e
))
}), }),
Err(e) => Err(warp_utils::reject::server_sent_event_error( // Do not terminate the stream if the channel fills
format!("{:?}", e), // up. Just drop some messages and send a comment to
)), // the client.
Err(BroadcastStreamRecvError::Lagged(n)) => {
Event::default().comment(format!(
"error - dropped {n} messages"
))
} }
})); }
})
.map(Ok::<_, std::convert::Infallible>),
);
} }
} else { } else {
return Err(warp_utils::reject::custom_server_error( return Err(warp_utils::reject::custom_server_error(
@ -4373,7 +4385,7 @@ pub fn serve<T: BeaconChainTypes>(
let s = futures::stream::select_all(receivers); let s = futures::stream::select_all(receivers);
Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s))) Ok(warp::sse::reply(warp::sse::keep_alive().stream(s)))
}) })
}, },
); );

View File

@ -225,6 +225,7 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
allow_sync_stalled: false, allow_sync_stalled: false,
data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR), data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR),
spec_fork_name: None, spec_fork_name: None,
sse_capacity_multiplier: 1,
enable_beacon_processor: true, enable_beacon_processor: true,
}, },
chain: Some(chain), chain: Some(chain),

View File

@ -382,6 +382,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
stalled. This is useful for very small testnets. TESTING ONLY. DO NOT USE ON \ stalled. This is useful for very small testnets. TESTING ONLY. DO NOT USE ON \
MAINNET.") MAINNET.")
) )
.arg(
Arg::with_name("http-sse-capacity-multiplier")
.long("http-sse-capacity-multiplier")
.takes_value(true)
.default_value("1")
.value_name("N")
.help("Multiplier to apply to the length of HTTP server-sent-event (SSE) channels. \
Increasing this value can prevent messages from being dropped.")
)
.arg( .arg(
Arg::with_name("http-enable-beacon-processor") Arg::with_name("http-enable-beacon-processor")
.long("http-enable-beacon-processor") .long("http-enable-beacon-processor")

View File

@ -149,6 +149,9 @@ pub fn get_config<E: EthSpec>(
client_config.http_api.allow_sync_stalled = true; client_config.http_api.allow_sync_stalled = true;
} }
client_config.http_api.sse_capacity_multiplier =
parse_required(cli_args, "http-sse-capacity-multiplier")?;
client_config.http_api.enable_beacon_processor = client_config.http_api.enable_beacon_processor =
parse_required(cli_args, "http-enable-beacon-processor")?; parse_required(cli_args, "http-enable-beacon-processor")?;

View File

@ -2349,3 +2349,18 @@ fn beacon_processor_zero_workers() {
.flag("beacon-processor-max-workers", Some("0")) .flag("beacon-processor-max-workers", Some("0"))
.run_with_zero_port(); .run_with_zero_port();
} }
#[test]
fn http_sse_capacity_multiplier_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 1));
}
#[test]
fn http_sse_capacity_multiplier_override() {
CommandLineTest::new()
.flag("http-sse-capacity-multiplier", Some("10"))
.run_with_zero_port()
.with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 10));
}