package sectorstorage import ( "container/heap" "context" "sort" "sync" "github.com/hashicorp/go-multierror" "golang.org/x/xerrors" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/sector-storage/sealtasks" "github.com/filecoin-project/sector-storage/storiface" ) const mib = 1 << 20 type WorkerAction func(ctx context.Context, w Worker) error type WorkerSelector interface { Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredProof, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) // true if a is preferred over b } type scheduler struct { spt abi.RegisteredProof workersLk sync.Mutex nextWorker WorkerID workers map[WorkerID]*workerHandle newWorkers chan *workerHandle watchClosing chan WorkerID workerClosing chan WorkerID schedule chan *workerRequest workerFree chan WorkerID closing chan struct{} schedQueue *requestQueue } func newScheduler(spt abi.RegisteredProof) *scheduler { return &scheduler{ spt: spt, nextWorker: 0, workers: map[WorkerID]*workerHandle{}, newWorkers: make(chan *workerHandle), watchClosing: make(chan WorkerID), workerClosing: make(chan WorkerID), schedule: make(chan *workerRequest), workerFree: make(chan WorkerID), closing: make(chan struct{}), schedQueue: &requestQueue{}, } } func (sh *scheduler) Schedule(ctx context.Context, sector abi.SectorID, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error { ret := make(chan workerResponse) select { case sh.schedule <- &workerRequest{ sector: sector, taskType: taskType, sel: sel, prepare: prepare, work: work, ret: ret, ctx: ctx, }: case <-sh.closing: return xerrors.New("closing") case <-ctx.Done(): return ctx.Err() } select { case resp := <-ret: return resp.err case <-sh.closing: return xerrors.New("closing") case <-ctx.Done(): return ctx.Err() } } type workerRequest struct { sector abi.SectorID taskType sealtasks.TaskType sel WorkerSelector prepare WorkerAction work WorkerAction index int // The index of the item in the heap. ret chan<- workerResponse ctx context.Context } type workerResponse struct { err error } func (r *workerRequest) respond(err error) { select { case r.ret <- workerResponse{err: err}: case <-r.ctx.Done(): log.Warnf("request got cancelled before we could respond") } } type activeResources struct { memUsedMin uint64 memUsedMax uint64 gpuUsed bool cpuUse uint64 cond *sync.Cond } type workerHandle struct { w Worker info storiface.WorkerInfo preparing *activeResources active *activeResources } func (sh *scheduler) runSched() { go sh.runWorkerWatcher() for { select { case w := <-sh.newWorkers: sh.schedNewWorker(w) case wid := <-sh.workerClosing: sh.schedDropWorker(wid) case req := <-sh.schedule: scheduled, err := sh.maybeSchedRequest(req) if err != nil { req.respond(err) continue } if scheduled { continue } heap.Push(sh.schedQueue, req) case wid := <-sh.workerFree: sh.onWorkerFreed(wid) case <-sh.closing: sh.schedClose() return } } } func (sh *scheduler) onWorkerFreed(wid WorkerID) { sh.workersLk.Lock() w, ok := sh.workers[wid] sh.workersLk.Unlock() if !ok { log.Warnf("onWorkerFreed on invalid worker %d", wid) return } for i := 0; i < sh.schedQueue.Len(); i++ { req := (*sh.schedQueue)[i] ok, err := req.sel.Ok(req.ctx, req.taskType, sh.spt, w) if err != nil { log.Errorf("onWorkerFreed req.sel.Ok error: %+v", err) continue } if !ok { continue } scheduled, err := sh.maybeSchedRequest(req) if err != nil { req.respond(err) continue } if scheduled { heap.Remove(sh.schedQueue, i) i-- continue } } } func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) { sh.workersLk.Lock() defer sh.workersLk.Unlock() tried := 0 var acceptable []WorkerID needRes := ResourceTable[req.taskType][sh.spt] for wid, worker := range sh.workers { ok, err := req.sel.Ok(req.ctx, req.taskType, sh.spt, worker) if err != nil { return false, err } if !ok { continue } tried++ if !canHandleRequest(needRes, sh.spt, wid, worker.info.Resources, worker.preparing) { continue } acceptable = append(acceptable, wid) } if len(acceptable) > 0 { { var serr error sort.SliceStable(acceptable, func(i, j int) bool { r, err := req.sel.Cmp(req.ctx, req.taskType, sh.workers[acceptable[i]], sh.workers[acceptable[j]]) if err != nil { serr = multierror.Append(serr, err) } return r }) if serr != nil { return false, xerrors.Errorf("error(s) selecting best worker: %w", serr) } } return true, sh.assignWorker(acceptable[0], sh.workers[acceptable[0]], req) } if tried == 0 { return false, xerrors.New("maybeSchedRequest didn't find any good workers") } return false, nil // put in waiting queue } func (sh *scheduler) assignWorker(wid WorkerID, w *workerHandle, req *workerRequest) error { needRes := ResourceTable[req.taskType][sh.spt] w.preparing.add(w.info.Resources, needRes) go func() { err := req.prepare(req.ctx, w.w) sh.workersLk.Lock() if err != nil { w.preparing.free(w.info.Resources, needRes) sh.workersLk.Unlock() select { case sh.workerFree <- wid: case <-sh.closing: log.Warnf("scheduler closed while sending response (prepare error: %+v)", err) } select { case req.ret <- workerResponse{err: err}: case <-req.ctx.Done(): log.Warnf("request got cancelled before we could respond (prepare error: %+v)", err) case <-sh.closing: log.Warnf("scheduler closed while sending response (prepare error: %+v)", err) } return } err = w.active.withResources(sh.spt, wid, w.info.Resources, needRes, &sh.workersLk, func() error { w.preparing.free(w.info.Resources, needRes) sh.workersLk.Unlock() defer sh.workersLk.Lock() // we MUST return locked from this function select { case sh.workerFree <- wid: case <-sh.closing: } err = req.work(req.ctx, w.w) select { case req.ret <- workerResponse{err: err}: case <-req.ctx.Done(): log.Warnf("request got cancelled before we could respond") case <-sh.closing: log.Warnf("scheduler closed while sending response") } return nil }) sh.workersLk.Unlock() // This error should always be nil, since nothing is setting it, but just to be safe: if err != nil { log.Errorf("error executing worker (withResources): %+v", err) } }() return nil } func (a *activeResources) withResources(spt abi.RegisteredProof, id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { for !canHandleRequest(r, spt, id, wr, a) { if a.cond == nil { a.cond = sync.NewCond(locker) } a.cond.Wait() } a.add(wr, r) err := cb() a.free(wr, r) if a.cond != nil { a.cond.Broadcast() } return err } func (a *activeResources) add(wr storiface.WorkerResources, r Resources) { a.gpuUsed = r.CanGPU if r.MultiThread() { a.cpuUse += wr.CPUs } else { a.cpuUse += uint64(r.Threads) } a.memUsedMin += r.MinMemory a.memUsedMax += r.MaxMemory } func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { if r.CanGPU { a.gpuUsed = false } if r.MultiThread() { a.cpuUse -= wr.CPUs } else { a.cpuUse -= uint64(r.Threads) } a.memUsedMin -= r.MinMemory a.memUsedMax -= r.MaxMemory } func canHandleRequest(needRes Resources, spt abi.RegisteredProof, wid WorkerID, res storiface.WorkerResources, active *activeResources) bool { // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) minNeedMem := res.MemReserved + active.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory if minNeedMem > res.MemPhysical { log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib) return false } maxNeedMem := res.MemReserved + active.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory if spt == abi.RegisteredProof_StackedDRG32GiBSeal { maxNeedMem += MaxCachingOverhead } if spt == abi.RegisteredProof_StackedDRG64GiBSeal { maxNeedMem += MaxCachingOverhead * 2 // ewwrhmwh } if maxNeedMem > res.MemSwap+res.MemPhysical { log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) return false } if needRes.MultiThread() { if active.cpuUse > 0 { log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, active.cpuUse, res.CPUs) return false } } else { if active.cpuUse+uint64(needRes.Threads) > res.CPUs { log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, active.cpuUse, res.CPUs) return false } } if len(res.GPUs) > 0 && needRes.CanGPU { if active.gpuUsed { log.Debugf("sched: not scheduling on worker %d; GPU in use", wid) return false } } return true } func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { var max float64 cpu := float64(a.cpuUse) / float64(wr.CPUs) max = cpu memMin := float64(a.memUsedMin+wr.MemReserved) / float64(wr.MemPhysical) if memMin > max { max = memMin } memMax := float64(a.memUsedMax+wr.MemReserved) / float64(wr.MemPhysical+wr.MemSwap) if memMax > max { max = memMax } return max } func (sh *scheduler) schedNewWorker(w *workerHandle) { sh.workersLk.Lock() id := sh.nextWorker sh.workers[id] = w sh.nextWorker++ sh.workersLk.Unlock() select { case sh.watchClosing <- id: case <-sh.closing: return } sh.onWorkerFreed(id) } func (sh *scheduler) schedDropWorker(wid WorkerID) { sh.workersLk.Lock() defer sh.workersLk.Unlock() w := sh.workers[wid] delete(sh.workers, wid) go func() { if err := w.w.Close(); err != nil { log.Warnf("closing worker %d: %+v", err) } }() } func (sh *scheduler) schedClose() { sh.workersLk.Lock() defer sh.workersLk.Unlock() for i, w := range sh.workers { if err := w.w.Close(); err != nil { log.Errorf("closing worker %d: %+v", i, err) } } } func (sh *scheduler) Close() error { close(sh.closing) return nil }