Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
Showing only changes of commit 6ed812db13 - Show all commits

View File

@ -159,27 +159,23 @@ func (sq *servingQueue) newTask(peer *clientPeer, maxTime uint64, priority int64
// run tokens from the token channel and allow the corresponding tasks to run // run tokens from the token channel and allow the corresponding tasks to run
// without entering the priority queue. // without entering the priority queue.
func (sq *servingQueue) threadController() { func (sq *servingQueue) threadController() {
defer sq.wg.Done()
for { for {
token := make(runToken) token := make(runToken)
select { select {
case best := <-sq.queueBestCh: case best := <-sq.queueBestCh:
best.tokenCh <- token best.tokenCh <- token
case <-sq.stopThreadCh: case <-sq.stopThreadCh:
sq.wg.Done()
return return
case <-sq.quit: case <-sq.quit:
sq.wg.Done()
return return
} }
<-token
select { select {
case <-sq.stopThreadCh: case <-sq.stopThreadCh:
sq.wg.Done()
return return
case <-sq.quit: case <-sq.quit:
sq.wg.Done()
return return
default: case <-token:
} }
} }
} }
@ -298,6 +294,7 @@ func (sq *servingQueue) addTask(task *servingTask) {
// and always tries to send the highest priority task to queueBestCh. Successfully sent // and always tries to send the highest priority task to queueBestCh. Successfully sent
// tasks are removed from the queue. // tasks are removed from the queue.
func (sq *servingQueue) queueLoop() { func (sq *servingQueue) queueLoop() {
defer sq.wg.Done()
for { for {
if sq.best != nil { if sq.best != nil {
expTime := sq.best.expTime expTime := sq.best.expTime
@ -316,7 +313,6 @@ func (sq *servingQueue) queueLoop() {
sq.best, _ = sq.queue.PopItem().(*servingTask) sq.best, _ = sq.queue.PopItem().(*servingTask)
} }
case <-sq.quit: case <-sq.quit:
sq.wg.Done()
return return
} }
} else { } else {
@ -324,7 +320,6 @@ func (sq *servingQueue) queueLoop() {
case task := <-sq.queueAddCh: case task := <-sq.queueAddCh:
sq.addTask(task) sq.addTask(task)
case <-sq.quit: case <-sq.quit:
sq.wg.Done()
return return
} }
} }
@ -335,6 +330,7 @@ func (sq *servingQueue) queueLoop() {
// of active thread controller goroutines. // of active thread controller goroutines.
func (sq *servingQueue) threadCountLoop() { func (sq *servingQueue) threadCountLoop() {
var threadCountTarget int var threadCountTarget int
defer sq.wg.Done()
for { for {
for threadCountTarget > sq.threadCount { for threadCountTarget > sq.threadCount {
sq.wg.Add(1) sq.wg.Add(1)
@ -347,14 +343,12 @@ func (sq *servingQueue) threadCountLoop() {
case sq.stopThreadCh <- struct{}{}: case sq.stopThreadCh <- struct{}{}:
sq.threadCount-- sq.threadCount--
case <-sq.quit: case <-sq.quit:
sq.wg.Done()
return return
} }
} else { } else {
select { select {
case threadCountTarget = <-sq.setThreadsCh: case threadCountTarget = <-sq.setThreadsCh:
case <-sq.quit: case <-sq.quit:
sq.wg.Done()
return return
} }
} }