lotus/extern/sector-storage/sched_watch.go

101 lines
1.8 KiB
Go
Raw Normal View History

2020-05-01 18:00:17 +00:00
package sectorstorage
import (
"context"
"reflect"
)
func (sh *scheduler) runWorkerWatcher() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
nilch := reflect.ValueOf(new(chan struct{})).Elem()
cases := []reflect.SelectCase{
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(sh.closing),
},
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(sh.watchClosing),
},
}
caseToWorker := map[int]WorkerID{}
for {
n, rv, ok := reflect.Select(cases)
switch {
case n == 0: // sh.closing
return
case n == 1: // sh.watchClosing
if !ok {
log.Errorf("watchClosing channel closed")
return
}
wid, ok := rv.Interface().(WorkerID)
if !ok {
panic("got a non-WorkerID message")
}
sh.workersLk.Lock()
workerClosing, err := sh.workers[wid].w.Closing(ctx)
sh.workersLk.Unlock()
if err != nil {
log.Errorf("getting worker closing channel: %+v", err)
select {
case sh.workerClosing <- wid:
case <-sh.closing:
return
}
continue
}
toSet := -1
for i, sc := range cases {
if sc.Chan == nilch {
toSet = i
break
}
}
if toSet == -1 {
toSet = len(cases)
cases = append(cases, reflect.SelectCase{})
}
cases[toSet] = reflect.SelectCase{
2020-05-01 18:04:21 +00:00
Dir: reflect.SelectRecv,
2020-05-01 18:00:17 +00:00
Chan: reflect.ValueOf(workerClosing),
}
caseToWorker[toSet] = wid
default:
2020-07-17 10:59:12 +00:00
wid, found := caseToWorker[n]
if !found {
log.Errorf("worker ID not found for case %d", n)
continue
}
2020-05-01 18:00:17 +00:00
delete(caseToWorker, n)
cases[n] = reflect.SelectCase{
2020-05-01 18:04:21 +00:00
Dir: reflect.SelectRecv,
2020-05-01 18:00:17 +00:00
Chan: nilch,
}
log.Warnf("worker %d dropped", wid)
// send in a goroutine to avoid a deadlock between workerClosing / watchClosing
go func() {
select {
case sh.workerClosing <- wid:
case <-sh.closing:
return
}
}()
2020-05-01 18:00:17 +00:00
}
}
}