Use Drop impl to send worker idle message (#1718)
## Issue Addressed NA ## Proposed Changes Uses a `Drop` implementation to help ensure that `BeaconProcessor` workers are freed. This will help prevent against regression, if someone happens to add an early return and it will also help in the case of a panic. ## Additional Info NA
This commit is contained in:
parent
32338bcafa
commit
d72c026d32
@ -674,7 +674,16 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
/// Spawns a blocking worker thread to process some `Work`.
|
/// Spawns a blocking worker thread to process some `Work`.
|
||||||
///
|
///
|
||||||
/// Sends an message on `idle_tx` when the work is complete and the task is stopping.
|
/// Sends an message on `idle_tx` when the work is complete and the task is stopping.
|
||||||
fn spawn_worker(&mut self, mut idle_tx: mpsc::Sender<()>, work: Work<T::EthSpec>) {
|
fn spawn_worker(&mut self, idle_tx: mpsc::Sender<()>, work: Work<T::EthSpec>) {
|
||||||
|
// Wrap the `idle_tx` in a struct that will fire the idle message whenever it is dropped.
|
||||||
|
//
|
||||||
|
// This helps ensure that the worker is always freed in the case of an early exit or panic.
|
||||||
|
// As such, this instantiation should happen as early in the function as possible.
|
||||||
|
let send_idle_on_drop = SendOnDrop {
|
||||||
|
tx: idle_tx,
|
||||||
|
log: self.log.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
let work_id = work.str_id();
|
let work_id = work.str_id();
|
||||||
let worker_timer =
|
let worker_timer =
|
||||||
metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]);
|
metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]);
|
||||||
@ -804,16 +813,40 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
|||||||
"worker" => worker_id,
|
"worker" => worker_id,
|
||||||
);
|
);
|
||||||
|
|
||||||
idle_tx.try_send(()).unwrap_or_else(|e| {
|
// This explicit `drop` is used to remind the programmer that this variable must
|
||||||
crit!(
|
// not be dropped until the worker is complete. Dropping it early will cause the
|
||||||
log,
|
// worker to be marked as "free" and cause an over-spawning of workers.
|
||||||
"Unable to free worker";
|
drop(send_idle_on_drop);
|
||||||
"msg" => "failed to send idle_tx message",
|
|
||||||
"error" => e.to_string()
|
|
||||||
)
|
|
||||||
});
|
|
||||||
},
|
},
|
||||||
WORKER_TASK_NAME,
|
WORKER_TASK_NAME,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This struct will send a message on `self.tx` when it is dropped. An error will be logged on
|
||||||
|
/// `self.log` if the send fails (this happens when the node is shutting down).
|
||||||
|
///
|
||||||
|
/// ## Purpose
|
||||||
|
///
|
||||||
|
/// This is useful for ensuring that a worker-freed message is still sent if a worker panics.
|
||||||
|
///
|
||||||
|
/// The Rust docs for `Drop` state that `Drop` is called during an unwind in a panic:
|
||||||
|
///
|
||||||
|
/// https://doc.rust-lang.org/std/ops/trait.Drop.html#panics
|
||||||
|
pub struct SendOnDrop {
|
||||||
|
tx: mpsc::Sender<()>,
|
||||||
|
log: Logger,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for SendOnDrop {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Err(e) = self.tx.try_send(()) {
|
||||||
|
warn!(
|
||||||
|
self.log,
|
||||||
|
"Unable to free worker";
|
||||||
|
"msg" => "did not free worker, shutdown may be underway",
|
||||||
|
"error" => e.to_string()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user