Use Drop impl to send worker idle message (#1718)
## Issue Addressed NA ## Proposed Changes Uses a `Drop` implementation to help ensure that `BeaconProcessor` workers are freed. This will help prevent against regression, if someone happens to add an early return and it will also help in the case of a panic. ## Additional Info NA
This commit is contained in:
		
							parent
							
								
									47c921f326
								
							
						
					
					
						commit
						e7eb99cb5e
					
				| @ -674,7 +674,16 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> { | |||||||
|     /// Spawns a blocking worker thread to process some `Work`.
 |     /// Spawns a blocking worker thread to process some `Work`.
 | ||||||
|     ///
 |     ///
 | ||||||
|     /// Sends an message on `idle_tx` when the work is complete and the task is stopping.
 |     /// Sends an message on `idle_tx` when the work is complete and the task is stopping.
 | ||||||
|     fn spawn_worker(&mut self, mut idle_tx: mpsc::Sender<()>, work: Work<T::EthSpec>) { |     fn spawn_worker(&mut self, idle_tx: mpsc::Sender<()>, work: Work<T::EthSpec>) { | ||||||
|  |         // Wrap the `idle_tx` in a struct that will fire the idle message whenever it is dropped.
 | ||||||
|  |         //
 | ||||||
|  |         // This helps ensure that the worker is always freed in the case of an early exit or panic.
 | ||||||
|  |         // As such, this instantiation should happen as early in the function as possible.
 | ||||||
|  |         let send_idle_on_drop = SendOnDrop { | ||||||
|  |             tx: idle_tx, | ||||||
|  |             log: self.log.clone(), | ||||||
|  |         }; | ||||||
|  | 
 | ||||||
|         let work_id = work.str_id(); |         let work_id = work.str_id(); | ||||||
|         let worker_timer = |         let worker_timer = | ||||||
|             metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]); |             metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]); | ||||||
| @ -804,16 +813,40 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> { | |||||||
|                     "worker" => worker_id, |                     "worker" => worker_id, | ||||||
|                 ); |                 ); | ||||||
| 
 | 
 | ||||||
|                 idle_tx.try_send(()).unwrap_or_else(|e| { |                 // This explicit `drop` is used to remind the programmer that this variable must
 | ||||||
|                     crit!( |                 // not be dropped until the worker is complete. Dropping it early will cause the
 | ||||||
|                         log, |                 // worker to be marked as "free" and cause an over-spawning of workers.
 | ||||||
|                         "Unable to free worker"; |                 drop(send_idle_on_drop); | ||||||
|                         "msg" => "failed to send idle_tx message", |  | ||||||
|                         "error" => e.to_string() |  | ||||||
|                     ) |  | ||||||
|                 }); |  | ||||||
|             }, |             }, | ||||||
|             WORKER_TASK_NAME, |             WORKER_TASK_NAME, | ||||||
|         ); |         ); | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | /// This struct will send a message on `self.tx` when it is dropped. An error will be logged on
 | ||||||
|  | /// `self.log` if the send fails (this happens when the node is shutting down).
 | ||||||
|  | ///
 | ||||||
|  | /// ## Purpose
 | ||||||
|  | ///
 | ||||||
|  | /// This is useful for ensuring that a worker-freed message is still sent if a worker panics.
 | ||||||
|  | ///
 | ||||||
|  | /// The Rust docs for `Drop` state that `Drop` is called during an unwind in a panic:
 | ||||||
|  | ///
 | ||||||
|  | /// https://doc.rust-lang.org/std/ops/trait.Drop.html#panics
 | ||||||
|  | pub struct SendOnDrop { | ||||||
|  |     tx: mpsc::Sender<()>, | ||||||
|  |     log: Logger, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl Drop for SendOnDrop { | ||||||
|  |     fn drop(&mut self) { | ||||||
|  |         if let Err(e) = self.tx.try_send(()) { | ||||||
|  |             warn!( | ||||||
|  |                 self.log, | ||||||
|  |                 "Unable to free worker"; | ||||||
|  |                 "msg" => "did not free worker, shutdown may be underway", | ||||||
|  |                 "error" => e.to_string() | ||||||
|  |             ) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user