From d72c026d32002146e26131abd7611d31e157238b Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sun, 4 Oct 2020 21:59:20 +0000 Subject: [PATCH] Use Drop impl to send worker idle message (#1718) ## Issue Addressed NA ## Proposed Changes Uses a `Drop` implementation to help ensure that `BeaconProcessor` workers are freed. This will help prevent against regression, if someone happens to add an early return and it will also help in the case of a panic. ## Additional Info NA --- .../network/src/beacon_processor/mod.rs | 51 +++++++++++++++---- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index c62bbdeb3..18983b620 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -674,7 +674,16 @@ impl BeaconProcessor { /// Spawns a blocking worker thread to process some `Work`. /// /// Sends an message on `idle_tx` when the work is complete and the task is stopping. - fn spawn_worker(&mut self, mut idle_tx: mpsc::Sender<()>, work: Work) { + fn spawn_worker(&mut self, idle_tx: mpsc::Sender<()>, work: Work) { + // Wrap the `idle_tx` in a struct that will fire the idle message whenever it is dropped. + // + // This helps ensure that the worker is always freed in the case of an early exit or panic. + // As such, this instantiation should happen as early in the function as possible. + let send_idle_on_drop = SendOnDrop { + tx: idle_tx, + log: self.log.clone(), + }; + let work_id = work.str_id(); let worker_timer = metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]); @@ -804,16 +813,40 @@ impl BeaconProcessor { "worker" => worker_id, ); - idle_tx.try_send(()).unwrap_or_else(|e| { - crit!( - log, - "Unable to free worker"; - "msg" => "failed to send idle_tx message", - "error" => e.to_string() - ) - }); + // This explicit `drop` is used to remind the programmer that this variable must + // not be dropped until the worker is complete. Dropping it early will cause the + // worker to be marked as "free" and cause an over-spawning of workers. + drop(send_idle_on_drop); }, WORKER_TASK_NAME, ); } } + +/// This struct will send a message on `self.tx` when it is dropped. An error will be logged on +/// `self.log` if the send fails (this happens when the node is shutting down). +/// +/// ## Purpose +/// +/// This is useful for ensuring that a worker-freed message is still sent if a worker panics. +/// +/// The Rust docs for `Drop` state that `Drop` is called during an unwind in a panic: +/// +/// https://doc.rust-lang.org/std/ops/trait.Drop.html#panics +pub struct SendOnDrop { + tx: mpsc::Sender<()>, + log: Logger, +} + +impl Drop for SendOnDrop { + fn drop(&mut self) { + if let Err(e) = self.tx.try_send(()) { + warn!( + self.log, + "Unable to free worker"; + "msg" => "did not free worker, shutdown may be underway", + "error" => e.to_string() + ) + } + } +}