lotus/storage/sector/sched_watch.go
2020-08-16 11:09:58 +01:00

101 lines
1.8 KiB
Go

package sector
import (
"context"
"reflect"
)
func (sh *scheduler) runWorkerWatcher() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
nilch := reflect.ValueOf(new(chan struct{})).Elem()
cases := []reflect.SelectCase{
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(sh.closing),
},
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(sh.watchClosing),
},
}
caseToWorker := map[int]WorkerID{}
for {
n, rv, ok := reflect.Select(cases)
switch {
case n == 0: // sh.closing
return
case n == 1: // sh.watchClosing
if !ok {
log.Errorf("watchClosing channel closed")
return
}
wid, ok := rv.Interface().(WorkerID)
if !ok {
panic("got a non-WorkerID message")
}
sh.workersLk.Lock()
workerClosing, err := sh.workers[wid].w.Closing(ctx)
sh.workersLk.Unlock()
if err != nil {
log.Errorf("getting worker closing channel: %+v", err)
select {
case sh.workerClosing <- wid:
case <-sh.closing:
return
}
continue
}
toSet := -1
for i, sc := range cases {
if sc.Chan == nilch {
toSet = i
break
}
}
if toSet == -1 {
toSet = len(cases)
cases = append(cases, reflect.SelectCase{})
}
cases[toSet] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(workerClosing),
}
caseToWorker[toSet] = wid
default:
wid, found := caseToWorker[n]
if !found {
log.Errorf("worker ID not found for case %d", n)
continue
}
delete(caseToWorker, n)
cases[n] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: nilch,
}
log.Warnf("worker %d dropped", wid)
// send in a goroutine to avoid a deadlock between workerClosing / watchClosing
go func() {
select {
case sh.workerClosing <- wid:
case <-sh.closing:
return
}
}()
}
}
}