diff --git a/les/servingqueue.go b/les/servingqueue.go index 16e064cb3..10c7e6f48 100644 --- a/les/servingqueue.go +++ b/les/servingqueue.go @@ -159,27 +159,23 @@ func (sq *servingQueue) newTask(peer *clientPeer, maxTime uint64, priority int64 // run tokens from the token channel and allow the corresponding tasks to run // without entering the priority queue. func (sq *servingQueue) threadController() { + defer sq.wg.Done() for { token := make(runToken) select { case best := <-sq.queueBestCh: best.tokenCh <- token case <-sq.stopThreadCh: - sq.wg.Done() return case <-sq.quit: - sq.wg.Done() return } - <-token select { case <-sq.stopThreadCh: - sq.wg.Done() return case <-sq.quit: - sq.wg.Done() return - default: + case <-token: } } } @@ -298,6 +294,7 @@ func (sq *servingQueue) addTask(task *servingTask) { // and always tries to send the highest priority task to queueBestCh. Successfully sent // tasks are removed from the queue. func (sq *servingQueue) queueLoop() { + defer sq.wg.Done() for { if sq.best != nil { expTime := sq.best.expTime @@ -316,7 +313,6 @@ func (sq *servingQueue) queueLoop() { sq.best, _ = sq.queue.PopItem().(*servingTask) } case <-sq.quit: - sq.wg.Done() return } } else { @@ -324,7 +320,6 @@ func (sq *servingQueue) queueLoop() { case task := <-sq.queueAddCh: sq.addTask(task) case <-sq.quit: - sq.wg.Done() return } } @@ -335,6 +330,7 @@ func (sq *servingQueue) queueLoop() { // of active thread controller goroutines. func (sq *servingQueue) threadCountLoop() { var threadCountTarget int + defer sq.wg.Done() for { for threadCountTarget > sq.threadCount { sq.wg.Add(1) @@ -347,14 +343,12 @@ func (sq *servingQueue) threadCountLoop() { case sq.stopThreadCh <- struct{}{}: sq.threadCount-- case <-sq.quit: - sq.wg.Done() return } } else { select { case threadCountTarget = <-sq.setThreadsCh: case <-sq.quit: - sq.wg.Done() return } }