diff --git a/manager.go b/manager.go index f125cdafd..f4408b6e0 100644 --- a/manager.go +++ b/manager.go @@ -152,8 +152,10 @@ func (m *Manager) AddWorker(ctx context.Context, w Worker) error { } m.sched.newWorkers <- &workerHandle{ - w: w, - info: info, + w: w, + info: info, + preparing: &activeResources{}, + active: &activeResources{}, } return nil } @@ -171,84 +173,6 @@ func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, ffiwr panic("implement me") } -/*func (m *Manager) getWorkersByPaths(task sealtasks.TaskType, inPaths []stores.StorageInfo) ([]WorkerID, map[WorkerID]stores.StorageInfo) { - m.workersLk.Lock() - defer m.workersLk.Unlock() - - var workers []WorkerID - paths := map[WorkerID]stores.StorageInfo{} - - for i, worker := range m.workers { - tt, err := worker.w.TaskTypes(context.TODO()) - if err != nil { - log.Errorf("error getting supported worker task types: %+v", err) - continue - } - if _, ok := tt[task]; !ok { - log.Debugf("dropping worker %d; task %s not supported (supports %v)", i, task, tt) - continue - } - - phs, err := worker.w.Paths(context.TODO()) - if err != nil { - log.Errorf("error getting worker paths: %+v", err) - continue - } - - // check if the worker has access to the path we selected - var st *stores.StorageInfo - for _, p := range phs { - for _, meta := range inPaths { - if p.ID == meta.ID { - if st != nil && st.Weight > p.Weight { - continue - } - - p := meta // copy - st = &p - } - } - } - if st == nil { - log.Debugf("skipping worker %d; doesn't have any of %v", i, inPaths) - log.Debugf("skipping worker %d; only has %v", i, phs) - continue - } - - paths[i] = *st - workers = append(workers, i) - } - - return workers, paths -} - -func (m *Manager) getWorker(ctx context.Context, taskType sealtasks.TaskType, accept []WorkerID) (Worker, func(), error) { - ret := make(chan workerResponse) - - select { - case m.schedule <- &workerRequest{ - taskType: taskType, - accept: accept, - - ctx: ctx.Done(), - ret: ret, - }: - case <-m.closing: - return nil, nil, xerrors.New("closing") - case <-ctx.Done(): - return nil, nil, ctx.Err() - } - - select { - case resp := <-ret: - return resp.worker, resp.done, resp.err - case <-m.closing: - return nil, nil, xerrors.New("closing") - case <-ctx.Done(): - return nil, nil, ctx.Err() - } -}*/ - func schedNop(context.Context, Worker) error { return nil } diff --git a/sched.go b/sched.go index 1a2efb5e7..78f2a6664 100644 --- a/sched.go +++ b/sched.go @@ -109,15 +109,22 @@ func (r *workerRequest) respond(err error) { } } +type activeResources struct { + memUsedMin uint64 + memUsedMax uint64 + gpuUsed bool + cpuUse uint64 + + cond *sync.Cond +} + type workerHandle struct { w Worker info storiface.WorkerInfo - memUsedMin uint64 - memUsedMax uint64 - gpuUsed bool - cpuUse uint64 // 0 - free; 1+ - singlecore things + preparing *activeResources + active *activeResources } func (sh *scheduler) runSched() { @@ -198,12 +205,8 @@ func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) { } tried++ - canDo, err := sh.canHandleRequest(wid, worker, req) - if err != nil { - return false, err - } - - if !canDo { + needRes := ResourceTable[req.taskType][sh.spt] + if !canHandleRequest(needRes, sh.spt, wid, worker.info.Resources, worker.preparing) { continue } @@ -240,99 +243,120 @@ func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) { func (sh *scheduler) assignWorker(wid WorkerID, w *workerHandle, req *workerRequest) error { needRes := ResourceTable[req.taskType][sh.spt] - w.gpuUsed = needRes.CanGPU - if needRes.MultiThread() { - w.cpuUse += w.info.Resources.CPUs - } else { - w.cpuUse++ - } - - w.memUsedMin += needRes.MinMemory - w.memUsedMax += needRes.MaxMemory + w.preparing.add(w.info.Resources, needRes) go func() { - var err error - - defer func() { - sh.workersLk.Lock() - - if needRes.CanGPU { - w.gpuUsed = false - } - - if needRes.MultiThread() { - w.cpuUse -= w.info.Resources.CPUs - } else { - w.cpuUse-- - } - - w.memUsedMin -= needRes.MinMemory - w.memUsedMax -= needRes.MaxMemory + err := req.prepare(req.ctx, w.w) + sh.workersLk.Lock() + err = w.active.withResources(sh.spt, wid, w.info.Resources, needRes, &sh.workersLk, func() error { + w.preparing.free(w.info.Resources, needRes) sh.workersLk.Unlock() + defer sh.workersLk.Lock() // we MUST return locked from this function select { case sh.workerFree <- wid: case <-sh.closing: } - }() - err = req.prepare(req.ctx, w.w) - if err == nil { err = req.work(req.ctx, w.w) - } - select { - case req.ret <- workerResponse{err: err}: - case <-req.ctx.Done(): - log.Warnf("request got cancelled before we could respond") - case <-sh.closing: - log.Warnf("scheduler closed while sending response") - } + select { + case req.ret <- workerResponse{err: err}: + case <-req.ctx.Done(): + log.Warnf("request got cancelled before we could respond") + case <-sh.closing: + log.Warnf("scheduler closed while sending response") + } + + return nil + }) + + sh.workersLk.Unlock() }() return nil } -func (sh *scheduler) canHandleRequest(wid WorkerID, w *workerHandle, req *workerRequest) (bool, error) { - needRes, ok := ResourceTable[req.taskType][sh.spt] - if !ok { - return false, xerrors.Errorf("canHandleRequest: missing ResourceTable entry for %s/%d", req.taskType, sh.spt) +func (a *activeResources) withResources(spt abi.RegisteredProof, id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { + for !canHandleRequest(r, spt, id, wr, a) { + if a.cond == nil { + a.cond = sync.NewCond(locker) + } + a.cond.Wait() } - res := w.info.Resources + a.add(wr, r) + + err := cb() + + a.free(wr, r) + if a.cond != nil { + a.cond.Broadcast() + } + + return err +} + +func (a *activeResources) add(wr storiface.WorkerResources, r Resources) { + a.gpuUsed = r.CanGPU + if r.MultiThread() { + a.cpuUse += wr.CPUs + } else { + a.cpuUse += uint64(r.Threads) + } + + a.memUsedMin += r.MinMemory + a.memUsedMax += r.MaxMemory +} + +func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { + if r.CanGPU { + a.gpuUsed = false + } + if r.MultiThread() { + a.cpuUse -= wr.CPUs + } else { + a.cpuUse -= uint64(r.Threads) + } + + a.memUsedMin -= r.MinMemory + a.memUsedMax -= r.MaxMemory +} + +func canHandleRequest(needRes Resources, spt abi.RegisteredProof, wid WorkerID, res storiface.WorkerResources, active *activeResources) bool { // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) - minNeedMem := res.MemReserved + w.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory + minNeedMem := res.MemReserved + active.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory if minNeedMem > res.MemPhysical { log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib) - return false, nil + return false } - maxNeedMem := res.MemReserved + w.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory - if sh.spt == abi.RegisteredProof_StackedDRG32GiBSeal { + maxNeedMem := res.MemReserved + active.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory + if spt == abi.RegisteredProof_StackedDRG32GiBSeal { maxNeedMem += MaxCachingOverhead } if maxNeedMem > res.MemSwap+res.MemPhysical { log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) - return false, nil + return false } if needRes.MultiThread() { - if w.cpuUse > 0 { - log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, w.cpuUse, res.CPUs) - return false, nil + if active.cpuUse > 0 { + log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, active.cpuUse, res.CPUs) + return false } } if len(res.GPUs) > 0 && needRes.CanGPU { - if w.gpuUsed { + if active.gpuUsed { log.Debugf("sched: not scheduling on worker %d; GPU in use", wid) - return false, nil + return false } } - return true, nil + return true } func (sh *scheduler) schedNewWorker(w *workerHandle) { diff --git a/selector_alloc.go b/selector_alloc.go new file mode 100644 index 000000000..1ceab0ecb --- /dev/null +++ b/selector_alloc.go @@ -0,0 +1,59 @@ +package sectorstorage + +import ( + "context" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/sector-storage/sealtasks" + "github.com/filecoin-project/sector-storage/stores" +) + +type allocSelector struct { + best []stores.StorageInfo +} + +func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType) (*allocSelector, error) { + best, err := index.StorageBestAlloc(ctx, alloc, true) + if err != nil { + return nil, err + } + + return &allocSelector{ + best: best, + }, nil +} + +func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) { + tasks, err := whnd.w.TaskTypes(ctx) + if err != nil { + return false, xerrors.Errorf("getting supported worker task types: %w", err) + } + if _, supported := tasks[task]; !supported { + return false, nil + } + + paths, err := whnd.w.Paths(ctx) + if err != nil { + return false, xerrors.Errorf("getting worker paths: %w", err) + } + + have := map[stores.ID]struct{}{} + for _, path := range paths { + have[path.ID] = struct{}{} + } + + for _, info := range s.best { + if _, ok := have[info.ID]; ok { + return true, nil + } + } + + return false, nil +} + +func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) { + return a.info.Hostname > b.info.Hostname, nil // TODO: Better strategy +} + +var _ WorkerSelector = &allocSelector{} diff --git a/selector_existing.go b/selector_existing.go new file mode 100644 index 000000000..eccdefbf2 --- /dev/null +++ b/selector_existing.go @@ -0,0 +1,60 @@ +package sectorstorage + +import ( + "context" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/sector-storage/sealtasks" + "github.com/filecoin-project/sector-storage/stores" + "github.com/filecoin-project/specs-actors/actors/abi" +) + +type existingSelector struct { + best []stores.StorageInfo +} + +func newExistingSelector(ctx context.Context, index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) (*existingSelector, error) { + best, err := index.StorageFindSector(ctx, sector, alloc, allowFetch) + if err != nil { + return nil, err + } + + return &existingSelector{ + best: best, + }, nil +} + +func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) { + tasks, err := whnd.w.TaskTypes(ctx) + if err != nil { + return false, xerrors.Errorf("getting supported worker task types: %w", err) + } + if _, supported := tasks[task]; !supported { + return false, nil + } + + paths, err := whnd.w.Paths(ctx) + if err != nil { + return false, xerrors.Errorf("getting worker paths: %w", err) + } + + have := map[stores.ID]struct{}{} + for _, path := range paths { + have[path.ID] = struct{}{} + } + + for _, info := range s.best { + if _, ok := have[info.ID]; ok { + return true, nil + } + } + + return false, nil +} + +func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) { + return a.info.Hostname > b.info.Hostname, nil // TODO: Better strategy +} + +var _ WorkerSelector = &existingSelector{} diff --git a/selector_task.go b/selector_task.go new file mode 100644 index 000000000..f1f4b7770 --- /dev/null +++ b/selector_task.go @@ -0,0 +1,46 @@ +package sectorstorage + +import ( + "context" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/sector-storage/sealtasks" + "github.com/filecoin-project/sector-storage/stores" +) + +type taskSelector struct { + best []stores.StorageInfo +} + +func newTaskSelector() *taskSelector { + return &taskSelector{} +} + +func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) { + tasks, err := whnd.w.TaskTypes(ctx) + if err != nil { + return false, xerrors.Errorf("getting supported worker task types: %w", err) + } + _, supported := tasks[task] + + return supported, nil +} + +func (s *taskSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) { + atasks, err := a.w.TaskTypes(ctx) + if err != nil { + return false, xerrors.Errorf("getting supported worker task types: %w", err) + } + btasks, err := b.w.TaskTypes(ctx) + if err != nil { + return false, xerrors.Errorf("getting supported worker task types: %w", err) + } + if len(atasks) != len(btasks) { + return len(atasks) < len(btasks), nil // prefer workers which can do less + } + + return a.info.Hostname > a.info.Hostname, nil // TODO: Better fallback strategy +} + +var _ WorkerSelector = &allocSelector{} diff --git a/stats.go b/stats.go index 84e6c5bb3..dbbee07f3 100644 --- a/stats.go +++ b/stats.go @@ -11,10 +11,10 @@ func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats { for id, handle := range m.sched.workers { out[uint64(id)] = storiface.WorkerStats{ Info: handle.info, - MemUsedMin: handle.memUsedMin, - MemUsedMax: handle.memUsedMax, - GpuUsed: handle.gpuUsed, - CpuUse: handle.cpuUse, + MemUsedMin: handle.active.memUsedMin, + MemUsedMax: handle.active.memUsedMax, + GpuUsed: handle.active.gpuUsed, + CpuUse: handle.active.cpuUse, } }