Merge pull request #24 from filecoin-project/feat/worker-closing

sched: Handle closing workers better
This commit is contained in:
Łukasz Magiera 2020-05-01 20:11:53 +02:00 committed by GitHub
commit e4a9a16161
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 150 additions and 7 deletions

View File

@ -211,6 +211,10 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
}, nil }, nil
} }
func (l *LocalWorker) Closing(ctx context.Context) (<-chan struct{}, error) {
return make(chan struct{}), nil
}
func (l *LocalWorker) Close() error { func (l *LocalWorker) Close() error {
return nil return nil
} }

View File

@ -37,6 +37,9 @@ type Worker interface {
Info(context.Context) (storiface.WorkerInfo, error) Info(context.Context) (storiface.WorkerInfo, error)
// returns channel signalling worker shutdown
Closing(context.Context) (<-chan struct{}, error)
Close() error Close() error
} }

View File

@ -33,6 +33,10 @@ type scheduler struct {
workers map[WorkerID]*workerHandle workers map[WorkerID]*workerHandle
newWorkers chan *workerHandle newWorkers chan *workerHandle
watchClosing chan WorkerID
workerClosing chan WorkerID
schedule chan *workerRequest schedule chan *workerRequest
workerFree chan WorkerID workerFree chan WorkerID
closing chan struct{} closing chan struct{}
@ -48,6 +52,10 @@ func newScheduler(spt abi.RegisteredProof) *scheduler {
workers: map[WorkerID]*workerHandle{}, workers: map[WorkerID]*workerHandle{},
newWorkers: make(chan *workerHandle), newWorkers: make(chan *workerHandle),
watchClosing: make(chan WorkerID),
workerClosing: make(chan WorkerID),
schedule: make(chan *workerRequest), schedule: make(chan *workerRequest),
workerFree: make(chan WorkerID), workerFree: make(chan WorkerID),
closing: make(chan struct{}), closing: make(chan struct{}),
@ -128,10 +136,14 @@ type workerHandle struct {
} }
func (sh *scheduler) runSched() { func (sh *scheduler) runSched() {
go sh.runWorkerWatcher()
for { for {
select { select {
case w := <-sh.newWorkers: case w := <-sh.newWorkers:
sh.schedNewWorker(w) sh.schedNewWorker(w)
case wid := <-sh.workerClosing:
sh.schedDropWorker(wid)
case req := <-sh.schedule: case req := <-sh.schedule:
scheduled, err := sh.maybeSchedRequest(req) scheduled, err := sh.maybeSchedRequest(req)
if err != nil { if err != nil {
@ -153,10 +165,18 @@ func (sh *scheduler) runSched() {
} }
func (sh *scheduler) onWorkerFreed(wid WorkerID) { func (sh *scheduler) onWorkerFreed(wid WorkerID) {
sh.workersLk.Lock()
w, ok := sh.workers[wid]
sh.workersLk.Unlock()
if !ok {
log.Warnf("onWorkerFreed on invalid worker %d", wid)
return
}
for e := sh.schedQueue.Front(); e != nil; e = e.Next() { for e := sh.schedQueue.Front(); e != nil; e = e.Next() {
req := e.Value.(*workerRequest) req := e.Value.(*workerRequest)
ok, err := req.sel.Ok(req.ctx, req.taskType, sh.workers[wid]) ok, err := req.sel.Ok(req.ctx, req.taskType, w)
if err != nil { if err != nil {
log.Errorf("onWorkerFreed req.sel.Ok error: %+v", err) log.Errorf("onWorkerFreed req.sel.Ok error: %+v", err)
continue continue
@ -374,7 +394,7 @@ func canHandleRequest(needRes Resources, spt abi.RegisteredProof, wid WorkerID,
return false return false
} }
} else { } else {
if active.cpuUse + uint64(needRes.Threads) > res.CPUs { if active.cpuUse+uint64(needRes.Threads) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, active.cpuUse, res.CPUs) log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, active.cpuUse, res.CPUs)
return false return false
} }
@ -396,12 +416,12 @@ func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
cpu := float64(a.cpuUse) / float64(wr.CPUs) cpu := float64(a.cpuUse) / float64(wr.CPUs)
max = cpu max = cpu
memMin := float64(a.memUsedMin + wr.MemReserved) / float64(wr.MemPhysical) memMin := float64(a.memUsedMin+wr.MemReserved) / float64(wr.MemPhysical)
if memMin > max { if memMin > max {
max = memMin max = memMin
} }
memMax := float64(a.memUsedMax + wr.MemReserved) / float64(wr.MemPhysical + wr.MemSwap) memMax := float64(a.memUsedMax+wr.MemReserved) / float64(wr.MemPhysical+wr.MemSwap)
if memMax > max { if memMax > max {
max = memMax max = memMax
} }
@ -411,11 +431,34 @@ func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
func (sh *scheduler) schedNewWorker(w *workerHandle) { func (sh *scheduler) schedNewWorker(w *workerHandle) {
sh.workersLk.Lock() sh.workersLk.Lock()
defer sh.workersLk.Unlock()
id := sh.nextWorker id := sh.nextWorker
sh.workers[id] = w sh.workers[id] = w
sh.nextWorker++ sh.nextWorker++
sh.workersLk.Unlock()
select {
case sh.watchClosing <- id:
case <-sh.closing:
return
}
sh.onWorkerFreed(id)
}
func (sh *scheduler) schedDropWorker(wid WorkerID) {
sh.workersLk.Lock()
defer sh.workersLk.Unlock()
w := sh.workers[wid]
delete(sh.workers, wid)
go func() {
if err := w.w.Close(); err != nil {
log.Warnf("closing worker %d: %+v", err)
}
}()
} }
func (sh *scheduler) schedClose() { func (sh *scheduler) schedClose() {

93
sched_watch.go Normal file
View File

@ -0,0 +1,93 @@
package sectorstorage
import (
"context"
"reflect"
)
func (sh *scheduler) runWorkerWatcher() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
nilch := reflect.ValueOf(new(chan struct{})).Elem()
cases := []reflect.SelectCase{
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(sh.closing),
},
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(sh.watchClosing),
},
}
caseToWorker := map[int]WorkerID{}
for {
n, rv, ok := reflect.Select(cases)
switch {
case n == 0: // sh.closing
return
case n == 1: // sh.watchClosing
if !ok {
log.Errorf("watchClosing channel closed")
return
}
wid, ok := rv.Interface().(WorkerID)
if !ok {
panic("got a non-WorkerID message")
}
sh.workersLk.Lock()
workerClosing, err := sh.workers[wid].w.Closing(ctx)
sh.workersLk.Unlock()
if err != nil {
log.Errorf("getting worker closing channel: %+v", err)
select {
case sh.workerClosing <- wid:
case <-sh.closing:
return
}
continue
}
toSet := -1
for i, sc := range cases {
if sc.Chan == nilch {
toSet = i
break
}
}
if toSet == -1 {
toSet = len(cases)
cases = append(cases, reflect.SelectCase{})
}
cases[toSet] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(workerClosing),
}
caseToWorker[toSet] = wid
default:
wid := caseToWorker[n]
delete(caseToWorker, n)
cases[n] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: nilch,
}
log.Warnf("worker %d dropped", wid)
select {
case sh.workerClosing <- wid:
case <-sh.closing:
return
}
}
}
}

View File

@ -27,7 +27,7 @@ type Remote struct {
index SectorIndex index SectorIndex
auth http.Header auth http.Header
fetchLk sync.Mutex fetchLk sync.Mutex
fetching map[abi.SectorID]chan struct{} fetching map[abi.SectorID]chan struct{}
} }
@ -121,7 +121,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote(%d): not found", s, fileType) return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote(%d): not found", s, fileType)
} }
sort.Slice(si, func(i, j int) bool { sort.Slice(si, func(i, j int) bool {
return si[i].Weight < si[j].Weight return si[i].Weight < si[j].Weight
}) })