package sectorstorage import ( "context" "reflect" ) func (sh *scheduler) runWorkerWatcher() { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() nilch := reflect.ValueOf(new(chan struct{})).Elem() cases := []reflect.SelectCase{ { Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sh.closing), }, { Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sh.watchClosing), }, } caseToWorker := map[int]WorkerID{} for { n, rv, ok := reflect.Select(cases) switch { case n == 0: // sh.closing return case n == 1: // sh.watchClosing if !ok { log.Errorf("watchClosing channel closed") return } wid, ok := rv.Interface().(WorkerID) if !ok { panic("got a non-WorkerID message") } sh.workersLk.Lock() workerClosing, err := sh.workers[wid].w.Closing(ctx) sh.workersLk.Unlock() if err != nil { log.Errorf("getting worker closing channel: %+v", err) select { case sh.workerClosing <- wid: case <-sh.closing: return } continue } toSet := -1 for i, sc := range cases { if sc.Chan == nilch { toSet = i break } } if toSet == -1 { toSet = len(cases) cases = append(cases, reflect.SelectCase{}) } cases[toSet] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(workerClosing), } caseToWorker[toSet] = wid default: wid, found := caseToWorker[n] if !found { log.Errorf("worker ID not found for case %d", n) continue } delete(caseToWorker, n) cases[n] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: nilch, } log.Warnf("worker %d dropped", wid) // send in a goroutine to avoid a deadlock between workerClosing / watchClosing go func() { select { case sh.workerClosing <- wid: case <-sh.closing: return } }() } } }