From 9ac19cb14bf0f311242aac814adc16f9abb74c1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 18 May 2022 15:47:08 +0200 Subject: [PATCH 1/9] feat: sealing: Put scheduler assign logic behind an interface --- extern/sector-storage/manager.go | 2 +- extern/sector-storage/request_queue.go | 24 +- extern/sector-storage/request_queue_test.go | 30 +- extern/sector-storage/sched.go | 384 ++++--------------- extern/sector-storage/sched_assigner_util.go | 254 ++++++++++++ extern/sector-storage/sched_post.go | 28 +- extern/sector-storage/sched_resources.go | 20 +- extern/sector-storage/sched_test.go | 66 ++-- extern/sector-storage/sched_worker.go | 144 +++---- extern/sector-storage/selector_alloc.go | 6 +- extern/sector-storage/selector_existing.go | 6 +- extern/sector-storage/selector_task.go | 6 +- extern/sector-storage/stats.go | 16 +- 13 files changed, 506 insertions(+), 480 deletions(-) create mode 100644 extern/sector-storage/sched_assigner_util.go diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 4b52f9a1d..f75bdd107 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -62,7 +62,7 @@ type Manager struct { remoteHnd *stores.FetchHandler index stores.SectorIndex - sched *scheduler + sched *Scheduler windowPoStSched *poStScheduler winningPoStSched *poStScheduler diff --git a/extern/sector-storage/request_queue.go b/extern/sector-storage/request_queue.go index 925c44fa8..87a057923 100644 --- a/extern/sector-storage/request_queue.go +++ b/extern/sector-storage/request_queue.go @@ -2,34 +2,34 @@ package sectorstorage import "sort" -type requestQueue []*workerRequest +type RequestQueue []*WorkerRequest -func (q requestQueue) Len() int { return len(q) } +func (q RequestQueue) Len() int { return len(q) } -func (q requestQueue) Less(i, j int) bool { - oneMuchLess, muchLess := q[i].taskType.MuchLess(q[j].taskType) +func (q RequestQueue) Less(i, j int) bool { + oneMuchLess, muchLess := q[i].TaskType.MuchLess(q[j].TaskType) if oneMuchLess { return muchLess } - if q[i].priority != q[j].priority { - return q[i].priority > q[j].priority + if q[i].Priority != q[j].Priority { + return q[i].Priority > q[j].Priority } - if q[i].taskType != q[j].taskType { - return q[i].taskType.Less(q[j].taskType) + if q[i].TaskType != q[j].TaskType { + return q[i].TaskType.Less(q[j].TaskType) } - return q[i].sector.ID.Number < q[j].sector.ID.Number // optimize minerActor.NewSectors bitfield + return q[i].Sector.ID.Number < q[j].Sector.ID.Number // optimize minerActor.NewSectors bitfield } -func (q requestQueue) Swap(i, j int) { +func (q RequestQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] q[i].index = i q[j].index = j } -func (q *requestQueue) Push(x *workerRequest) { +func (q *RequestQueue) Push(x *WorkerRequest) { n := len(*q) item := x item.index = n @@ -37,7 +37,7 @@ func (q *requestQueue) Push(x *workerRequest) { sort.Sort(q) } -func (q *requestQueue) Remove(i int) *workerRequest { +func (q *RequestQueue) Remove(i int) *WorkerRequest { old := *q n := len(old) item := old[i] diff --git a/extern/sector-storage/request_queue_test.go b/extern/sector-storage/request_queue_test.go index ed802f30a..c95548c43 100644 --- a/extern/sector-storage/request_queue_test.go +++ b/extern/sector-storage/request_queue_test.go @@ -8,13 +8,13 @@ import ( ) func TestRequestQueue(t *testing.T) { - rq := &requestQueue{} + rq := &RequestQueue{} - rq.Push(&workerRequest{taskType: sealtasks.TTAddPiece}) - rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit1}) - rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit2}) - rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit1}) - rq.Push(&workerRequest{taskType: sealtasks.TTAddPiece}) + rq.Push(&WorkerRequest{TaskType: sealtasks.TTAddPiece}) + rq.Push(&WorkerRequest{TaskType: sealtasks.TTPreCommit1}) + rq.Push(&WorkerRequest{TaskType: sealtasks.TTPreCommit2}) + rq.Push(&WorkerRequest{TaskType: sealtasks.TTPreCommit1}) + rq.Push(&WorkerRequest{TaskType: sealtasks.TTAddPiece}) dump := func(s string) { fmt.Println("---") @@ -22,7 +22,7 @@ func TestRequestQueue(t *testing.T) { for sqi := 0; sqi < rq.Len(); sqi++ { task := (*rq)[sqi] - fmt.Println(sqi, task.taskType) + fmt.Println(sqi, task.TaskType) } } @@ -32,31 +32,31 @@ func TestRequestQueue(t *testing.T) { dump("pop 1") - if pt.taskType != sealtasks.TTPreCommit2 { - t.Error("expected precommit2, got", pt.taskType) + if pt.TaskType != sealtasks.TTPreCommit2 { + t.Error("expected precommit2, got", pt.TaskType) } pt = rq.Remove(0) dump("pop 2") - if pt.taskType != sealtasks.TTPreCommit1 { - t.Error("expected precommit1, got", pt.taskType) + if pt.TaskType != sealtasks.TTPreCommit1 { + t.Error("expected precommit1, got", pt.TaskType) } pt = rq.Remove(1) dump("pop 3") - if pt.taskType != sealtasks.TTAddPiece { - t.Error("expected addpiece, got", pt.taskType) + if pt.TaskType != sealtasks.TTAddPiece { + t.Error("expected addpiece, got", pt.TaskType) } pt = rq.Remove(0) dump("pop 4") - if pt.taskType != sealtasks.TTPreCommit1 { - t.Error("expected precommit1, got", pt.taskType) + if pt.TaskType != sealtasks.TTPreCommit1 { + t.Error("expected precommit1, got", pt.TaskType) } } diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index eb2d17c59..8d190698d 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -2,9 +2,6 @@ package sectorstorage import ( "context" - "math" - "math/rand" - "sort" "sync" "time" @@ -47,23 +44,26 @@ const mib = 1 << 20 type WorkerAction func(ctx context.Context, w Worker) error type WorkerSelector interface { - Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task + Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) // true if worker is acceptable for performing a task - Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) // true if a is preferred over b + Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b } -type scheduler struct { - workersLk sync.RWMutex - workers map[storiface.WorkerID]*workerHandle +type Scheduler struct { + assigner Assigner - schedule chan *workerRequest - windowRequests chan *schedWindowRequest + workersLk sync.RWMutex + + Workers map[storiface.WorkerID]*WorkerHandle + + schedule chan *WorkerRequest + windowRequests chan *SchedWindowRequest workerChange chan struct{} // worker added / changed/freed resources workerDisable chan workerDisableReq // owned by the sh.runSched goroutine - schedQueue *requestQueue - openWindows []*schedWindowRequest + SchedQueue *RequestQueue + OpenWindows []*SchedWindowRequest workTracker *workTracker @@ -74,24 +74,24 @@ type scheduler struct { testSync chan struct{} // used for testing } -type workerHandle struct { +type WorkerHandle struct { workerRpc Worker tasksCache map[sealtasks.TaskType]struct{} tasksUpdate time.Time tasksLk sync.Mutex - info storiface.WorkerInfo + Info storiface.WorkerInfo - preparing *activeResources // use with workerHandle.lk - active *activeResources // use with workerHandle.lk + preparing *activeResources // use with WorkerHandle.lk + active *activeResources // use with WorkerHandle.lk lk sync.Mutex // can be taken inside sched.workersLk.RLock wndLk sync.Mutex // can be taken inside sched.workersLk.RLock - activeWindows []*schedWindow + activeWindows []*SchedWindow - enabled bool + Enabled bool // for sync manager goroutine closing cleanupStarted bool @@ -99,19 +99,19 @@ type workerHandle struct { closingMgr chan struct{} } -type schedWindowRequest struct { - worker storiface.WorkerID +type SchedWindowRequest struct { + Worker storiface.WorkerID - done chan *schedWindow + Done chan *SchedWindow } -type schedWindow struct { - allocated activeResources - todo []*workerRequest +type SchedWindow struct { + Allocated activeResources + Todo []*WorkerRequest } type workerDisableReq struct { - activeWindows []*schedWindow + activeWindows []*SchedWindow wid storiface.WorkerID done func() } @@ -126,11 +126,11 @@ type activeResources struct { waiting int } -type workerRequest struct { - sector storage.SectorRef - taskType sealtasks.TaskType - priority int // larger values more important - sel WorkerSelector +type WorkerRequest struct { + Sector storage.SectorRef + TaskType sealtasks.TaskType + Priority int // larger values more important + Sel WorkerSelector prepare WorkerAction work WorkerAction @@ -139,25 +139,27 @@ type workerRequest struct { index int // The index of the item in the heap. - indexHeap int + IndexHeap int ret chan<- workerResponse - ctx context.Context + Ctx context.Context } type workerResponse struct { err error } -func newScheduler() *scheduler { - return &scheduler{ - workers: map[storiface.WorkerID]*workerHandle{}, +func newScheduler() *Scheduler { + return &Scheduler{ + assigner: &AssignerUtil{}, - schedule: make(chan *workerRequest), - windowRequests: make(chan *schedWindowRequest, 20), + Workers: map[storiface.WorkerID]*WorkerHandle{}, + + schedule: make(chan *WorkerRequest), + windowRequests: make(chan *SchedWindowRequest, 20), workerChange: make(chan struct{}, 20), workerDisable: make(chan workerDisableReq), - schedQueue: &requestQueue{}, + SchedQueue: &RequestQueue{}, workTracker: &workTracker{ done: map[storiface.CallID]struct{}{}, @@ -172,15 +174,15 @@ func newScheduler() *scheduler { } } -func (sh *scheduler) Schedule(ctx context.Context, sector storage.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error { +func (sh *Scheduler) Schedule(ctx context.Context, sector storage.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error { ret := make(chan workerResponse) select { - case sh.schedule <- &workerRequest{ - sector: sector, - taskType: taskType, - priority: getPriority(ctx), - sel: sel, + case sh.schedule <- &WorkerRequest{ + Sector: sector, + TaskType: taskType, + Priority: getPriority(ctx), + Sel: sel, prepare: prepare, work: work, @@ -188,7 +190,7 @@ func (sh *scheduler) Schedule(ctx context.Context, sector storage.SectorRef, tas start: time.Now(), ret: ret, - ctx: ctx, + Ctx: ctx, }: case <-sh.closing: return xerrors.New("closing") @@ -206,10 +208,10 @@ func (sh *scheduler) Schedule(ctx context.Context, sector storage.SectorRef, tas } } -func (r *workerRequest) respond(err error) { +func (r *WorkerRequest) respond(err error) { select { case r.ret <- workerResponse{err: err}: - case <-r.ctx.Done(): + case <-r.Ctx.Done(): log.Warnf("request got cancelled before we could respond") } } @@ -225,7 +227,7 @@ type SchedDiagInfo struct { OpenWindows []string } -func (sh *scheduler) runSched() { +func (sh *Scheduler) runSched() { defer close(sh.closed) iw := time.After(InitWait) @@ -242,14 +244,14 @@ func (sh *scheduler) runSched() { toDisable = append(toDisable, dreq) doSched = true case req := <-sh.schedule: - sh.schedQueue.Push(req) + sh.SchedQueue.Push(req) doSched = true if sh.testSync != nil { sh.testSync <- struct{}{} } case req := <-sh.windowRequests: - sh.openWindows = append(sh.openWindows, req) + sh.OpenWindows = append(sh.OpenWindows, req) doSched = true case ireq := <-sh.info: ireq(sh.diag()) @@ -273,12 +275,12 @@ func (sh *scheduler) runSched() { case dreq := <-sh.workerDisable: toDisable = append(toDisable, dreq) case req := <-sh.schedule: - sh.schedQueue.Push(req) + sh.SchedQueue.Push(req) if sh.testSync != nil { sh.testSync <- struct{}{} } case req := <-sh.windowRequests: - sh.openWindows = append(sh.openWindows, req) + sh.OpenWindows = append(sh.OpenWindows, req) default: break loop } @@ -286,21 +288,21 @@ func (sh *scheduler) runSched() { for _, req := range toDisable { for _, window := range req.activeWindows { - for _, request := range window.todo { - sh.schedQueue.Push(request) + for _, request := range window.Todo { + sh.SchedQueue.Push(request) } } - openWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)) - for _, window := range sh.openWindows { - if window.worker != req.wid { + openWindows := make([]*SchedWindowRequest, 0, len(sh.OpenWindows)) + for _, window := range sh.OpenWindows { + if window.Worker != req.wid { openWindows = append(openWindows, window) } } - sh.openWindows = openWindows + sh.OpenWindows = openWindows sh.workersLk.Lock() - sh.workers[req.wid].enabled = false + sh.Workers[req.wid].Enabled = false sh.workersLk.Unlock() req.done() @@ -312,281 +314,51 @@ func (sh *scheduler) runSched() { } } -func (sh *scheduler) diag() SchedDiagInfo { +func (sh *Scheduler) diag() SchedDiagInfo { var out SchedDiagInfo - for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { - task := (*sh.schedQueue)[sqi] + for sqi := 0; sqi < sh.SchedQueue.Len(); sqi++ { + task := (*sh.SchedQueue)[sqi] out.Requests = append(out.Requests, SchedDiagRequestInfo{ - Sector: task.sector.ID, - TaskType: task.taskType, - Priority: task.priority, + Sector: task.Sector.ID, + TaskType: task.TaskType, + Priority: task.Priority, }) } sh.workersLk.RLock() defer sh.workersLk.RUnlock() - for _, window := range sh.openWindows { - out.OpenWindows = append(out.OpenWindows, uuid.UUID(window.worker).String()) + for _, window := range sh.OpenWindows { + out.OpenWindows = append(out.OpenWindows, uuid.UUID(window.Worker).String()) } return out } -func (sh *scheduler) trySched() { - /* - This assigns tasks to workers based on: - - Task priority (achieved by handling sh.schedQueue in order, since it's already sorted by priority) - - Worker resource availability - - Task-specified worker preference (acceptableWindows array below sorted by this preference) - - Window request age - - 1. For each task in the schedQueue find windows which can handle them - 1.1. Create list of windows capable of handling a task - 1.2. Sort windows according to task selector preferences - 2. Going through schedQueue again, assign task to first acceptable window - with resources available - 3. Submit windows with scheduled tasks to workers - - */ +type Assigner interface { + TrySched(sh *Scheduler) +} +func (sh *Scheduler) trySched() { sh.workersLk.RLock() defer sh.workersLk.RUnlock() - windowsLen := len(sh.openWindows) - queueLen := sh.schedQueue.Len() - - log.Debugf("SCHED %d queued; %d open windows", queueLen, windowsLen) - - if windowsLen == 0 || queueLen == 0 { - // nothing to schedule on - return - } - - windows := make([]schedWindow, windowsLen) - acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex - - // Step 1 - throttle := make(chan struct{}, windowsLen) - - var wg sync.WaitGroup - wg.Add(queueLen) - for i := 0; i < queueLen; i++ { - throttle <- struct{}{} - - go func(sqi int) { - defer wg.Done() - defer func() { - <-throttle - }() - - task := (*sh.schedQueue)[sqi] - - task.indexHeap = sqi - for wnd, windowRequest := range sh.openWindows { - worker, ok := sh.workers[windowRequest.worker] - if !ok { - log.Errorf("worker referenced by windowRequest not found (worker: %s)", windowRequest.worker) - // TODO: How to move forward here? - continue - } - - if !worker.enabled { - log.Debugw("skipping disabled worker", "worker", windowRequest.worker) - continue - } - - needRes := worker.info.Resources.ResourceSpec(task.sector.ProofType, task.taskType) - - // TODO: allow bigger windows - if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info) { - continue - } - - rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout) - ok, err := task.sel.Ok(rpcCtx, task.taskType, task.sector.ProofType, worker) - cancel() - if err != nil { - log.Errorf("trySched(1) req.sel.Ok error: %+v", err) - continue - } - - if !ok { - continue - } - - acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd) - } - - if len(acceptableWindows[sqi]) == 0 { - return - } - - // Pick best worker (shuffle in case some workers are equally as good) - rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) { - acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i] // nolint:scopelint - }) - sort.SliceStable(acceptableWindows[sqi], func(i, j int) bool { - wii := sh.openWindows[acceptableWindows[sqi][i]].worker // nolint:scopelint - wji := sh.openWindows[acceptableWindows[sqi][j]].worker // nolint:scopelint - - if wii == wji { - // for the same worker prefer older windows - return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint - } - - wi := sh.workers[wii] - wj := sh.workers[wji] - - rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout) - defer cancel() - - r, err := task.sel.Cmp(rpcCtx, task.taskType, wi, wj) - if err != nil { - log.Errorf("selecting best worker: %s", err) - } - return r - }) - }(i) - } - - wg.Wait() - - log.Debugf("SCHED windows: %+v", windows) - log.Debugf("SCHED Acceptable win: %+v", acceptableWindows) - - // Step 2 - scheduled := 0 - rmQueue := make([]int, 0, queueLen) - workerUtil := map[storiface.WorkerID]float64{} - - for sqi := 0; sqi < queueLen; sqi++ { - task := (*sh.schedQueue)[sqi] - - selectedWindow := -1 - var needRes storiface.Resources - var info storiface.WorkerInfo - var bestWid storiface.WorkerID - bestUtilization := math.MaxFloat64 // smaller = better - - for i, wnd := range acceptableWindows[task.indexHeap] { - wid := sh.openWindows[wnd].worker - w := sh.workers[wid] - - res := info.Resources.ResourceSpec(task.sector.ProofType, task.taskType) - - log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.sector.ID.Number, wnd, i) - - // TODO: allow bigger windows - if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", info) { - continue - } - - wu, found := workerUtil[wid] - if !found { - wu = w.utilization() - workerUtil[wid] = wu - } - if wu >= bestUtilization { - // acceptable worker list is initially sorted by utilization, and the initially-best workers - // will be assigned tasks first. This means that if we find a worker which isn't better, it - // probably means that the other workers aren't better either. - // - // utilization - // ^ - // | / - // | \ / - // | \ / - // | * - // #--------> acceptableWindow index - // - // * -> we're here - break - } - - info = w.info - needRes = res - bestWid = wid - selectedWindow = wnd - bestUtilization = wu - } - - if selectedWindow < 0 { - // all windows full - continue - } - - log.Debugw("SCHED ASSIGNED", - "sqi", sqi, - "sector", task.sector.ID.Number, - "task", task.taskType, - "window", selectedWindow, - "worker", bestWid, - "utilization", bestUtilization) - - workerUtil[bestWid] += windows[selectedWindow].allocated.add(info.Resources, needRes) - windows[selectedWindow].todo = append(windows[selectedWindow].todo, task) - - rmQueue = append(rmQueue, sqi) - scheduled++ - } - - if len(rmQueue) > 0 { - for i := len(rmQueue) - 1; i >= 0; i-- { - sh.schedQueue.Remove(rmQueue[i]) - } - } - - // Step 3 - - if scheduled == 0 { - return - } - - scheduledWindows := map[int]struct{}{} - for wnd, window := range windows { - if len(window.todo) == 0 { - // Nothing scheduled here, keep the window open - continue - } - - scheduledWindows[wnd] = struct{}{} - - window := window // copy - select { - case sh.openWindows[wnd].done <- &window: - default: - log.Error("expected sh.openWindows[wnd].done to be buffered") - } - } - - // Rewrite sh.openWindows array, removing scheduled windows - newOpenWindows := make([]*schedWindowRequest, 0, windowsLen-len(scheduledWindows)) - for wnd, window := range sh.openWindows { - if _, scheduled := scheduledWindows[wnd]; scheduled { - // keep unscheduled windows open - continue - } - - newOpenWindows = append(newOpenWindows, window) - } - - sh.openWindows = newOpenWindows + sh.assigner.TrySched(sh) } -func (sh *scheduler) schedClose() { +func (sh *Scheduler) schedClose() { sh.workersLk.Lock() defer sh.workersLk.Unlock() log.Debugf("closing scheduler") - for i, w := range sh.workers { + for i, w := range sh.Workers { sh.workerCleanup(i, w) } } -func (sh *scheduler) Info(ctx context.Context) (interface{}, error) { +func (sh *Scheduler) Info(ctx context.Context) (interface{}, error) { ch := make(chan interface{}, 1) sh.info <- func(res interface{}) { @@ -601,7 +373,7 @@ func (sh *scheduler) Info(ctx context.Context) (interface{}, error) { } } -func (sh *scheduler) Close(ctx context.Context) error { +func (sh *Scheduler) Close(ctx context.Context) error { close(sh.closing) select { case <-sh.closed: diff --git a/extern/sector-storage/sched_assigner_util.go b/extern/sector-storage/sched_assigner_util.go new file mode 100644 index 000000000..a75ceb363 --- /dev/null +++ b/extern/sector-storage/sched_assigner_util.go @@ -0,0 +1,254 @@ +package sectorstorage + +import ( + "context" + "math" + "math/rand" + "sort" + "sync" + + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" +) + +// AssignerUtil is a task assigner assigning tasks to workers with lowest utilization +type AssignerUtil struct{} + +var _ Assigner = &AssignerUtil{} + +func (a *AssignerUtil) TrySched(sh *Scheduler) { + /* + This assigns tasks to workers based on: + - Task priority (achieved by handling sh.SchedQueue in order, since it's already sorted by priority) + - Worker resource availability + - Task-specified worker preference (acceptableWindows array below sorted by this preference) + - Window request age + + 1. For each task in the SchedQueue find windows which can handle them + 1.1. Create list of windows capable of handling a task + 1.2. Sort windows according to task selector preferences + 2. Going through SchedQueue again, assign task to first acceptable window + with resources available + 3. Submit windows with scheduled tasks to workers + + */ + + windowsLen := len(sh.OpenWindows) + queueLen := sh.SchedQueue.Len() + + log.Debugf("SCHED %d queued; %d open windows", queueLen, windowsLen) + + if windowsLen == 0 || queueLen == 0 { + // nothing to schedule on + return + } + + windows := make([]SchedWindow, windowsLen) + acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex + + // Step 1 + throttle := make(chan struct{}, windowsLen) + + var wg sync.WaitGroup + wg.Add(queueLen) + for i := 0; i < queueLen; i++ { + throttle <- struct{}{} + + go func(sqi int) { + defer wg.Done() + defer func() { + <-throttle + }() + + task := (*sh.SchedQueue)[sqi] + + task.IndexHeap = sqi + for wnd, windowRequest := range sh.OpenWindows { + worker, ok := sh.Workers[windowRequest.Worker] + if !ok { + log.Errorf("worker referenced by windowRequest not found (worker: %s)", windowRequest.Worker) + // TODO: How to move forward here? + continue + } + + if !worker.Enabled { + log.Debugw("skipping disabled worker", "worker", windowRequest.Worker) + continue + } + + needRes := worker.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) + + // TODO: allow bigger windows + if !windows[wnd].Allocated.CanHandleRequest(needRes, windowRequest.Worker, "schedAcceptable", worker.Info) { + continue + } + + rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout) + ok, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker) + cancel() + if err != nil { + log.Errorf("trySched(1) req.Sel.Ok error: %+v", err) + continue + } + + if !ok { + continue + } + + acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd) + } + + if len(acceptableWindows[sqi]) == 0 { + return + } + + // Pick best worker (shuffle in case some workers are equally as good) + rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) { + acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i] // nolint:scopelint + }) + sort.SliceStable(acceptableWindows[sqi], func(i, j int) bool { + wii := sh.OpenWindows[acceptableWindows[sqi][i]].Worker // nolint:scopelint + wji := sh.OpenWindows[acceptableWindows[sqi][j]].Worker // nolint:scopelint + + if wii == wji { + // for the same worker prefer older windows + return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint + } + + wi := sh.Workers[wii] + wj := sh.Workers[wji] + + rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout) + defer cancel() + + r, err := task.Sel.Cmp(rpcCtx, task.TaskType, wi, wj) + if err != nil { + log.Errorf("selecting best worker: %s", err) + } + return r + }) + }(i) + } + + wg.Wait() + + log.Debugf("SCHED windows: %+v", windows) + log.Debugf("SCHED Acceptable win: %+v", acceptableWindows) + + // Step 2 + scheduled := 0 + rmQueue := make([]int, 0, queueLen) + workerUtil := map[storiface.WorkerID]float64{} + + for sqi := 0; sqi < queueLen; sqi++ { + task := (*sh.SchedQueue)[sqi] + + selectedWindow := -1 + var needRes storiface.Resources + var info storiface.WorkerInfo + var bestWid storiface.WorkerID + bestUtilization := math.MaxFloat64 // smaller = better + + for i, wnd := range acceptableWindows[task.IndexHeap] { + wid := sh.OpenWindows[wnd].Worker + w := sh.Workers[wid] + + res := info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) + + log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) + + // TODO: allow bigger windows + if !windows[wnd].Allocated.CanHandleRequest(needRes, wid, "schedAssign", info) { + continue + } + + wu, found := workerUtil[wid] + if !found { + wu = w.Utilization() + workerUtil[wid] = wu + } + if wu >= bestUtilization { + // acceptable worker list is initially sorted by utilization, and the initially-best workers + // will be assigned tasks first. This means that if we find a worker which isn't better, it + // probably means that the other workers aren't better either. + // + // utilization + // ^ + // | / + // | \ / + // | \ / + // | * + // #--------> acceptableWindow index + // + // * -> we're here + break + } + + info = w.Info + needRes = res + bestWid = wid + selectedWindow = wnd + bestUtilization = wu + } + + if selectedWindow < 0 { + // all windows full + continue + } + + log.Debugw("SCHED ASSIGNED", + "sqi", sqi, + "sector", task.Sector.ID.Number, + "task", task.TaskType, + "window", selectedWindow, + "worker", bestWid, + "utilization", bestUtilization) + + workerUtil[bestWid] += windows[selectedWindow].Allocated.Add(info.Resources, needRes) + windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) + + rmQueue = append(rmQueue, sqi) + scheduled++ + } + + if len(rmQueue) > 0 { + for i := len(rmQueue) - 1; i >= 0; i-- { + sh.SchedQueue.Remove(rmQueue[i]) + } + } + + // Step 3 + + if scheduled == 0 { + return + } + + scheduledWindows := map[int]struct{}{} + for wnd, window := range windows { + if len(window.Todo) == 0 { + // Nothing scheduled here, keep the window open + continue + } + + scheduledWindows[wnd] = struct{}{} + + window := window // copy + select { + case sh.OpenWindows[wnd].Done <- &window: + default: + log.Error("expected sh.OpenWindows[wnd].Done to be buffered") + } + } + + // Rewrite sh.OpenWindows array, removing scheduled windows + newOpenWindows := make([]*SchedWindowRequest, 0, windowsLen-len(scheduledWindows)) + for wnd, window := range sh.OpenWindows { + if _, scheduled := scheduledWindows[wnd]; scheduled { + // keep unscheduled windows open + continue + } + + newOpenWindows = append(newOpenWindows, window) + } + + sh.OpenWindows = newOpenWindows +} diff --git a/extern/sector-storage/sched_post.go b/extern/sector-storage/sched_post.go index 58d79fc86..3da13f79f 100644 --- a/extern/sector-storage/sched_post.go +++ b/extern/sector-storage/sched_post.go @@ -17,7 +17,7 @@ import ( type poStScheduler struct { lk sync.RWMutex - workers map[storiface.WorkerID]*workerHandle + workers map[storiface.WorkerID]*WorkerHandle cond *sync.Cond postType sealtasks.TaskType @@ -25,14 +25,14 @@ type poStScheduler struct { func newPoStScheduler(t sealtasks.TaskType) *poStScheduler { ps := &poStScheduler{ - workers: map[storiface.WorkerID]*workerHandle{}, + workers: map[storiface.WorkerID]*WorkerHandle{}, postType: t, } ps.cond = sync.NewCond(&ps.lk) return ps } -func (ps *poStScheduler) MaybeAddWorker(wid storiface.WorkerID, tasks map[sealtasks.TaskType]struct{}, w *workerHandle) bool { +func (ps *poStScheduler) MaybeAddWorker(wid storiface.WorkerID, tasks map[sealtasks.TaskType]struct{}, w *WorkerHandle) bool { if _, ok := tasks[ps.postType]; !ok { return false } @@ -49,10 +49,10 @@ func (ps *poStScheduler) MaybeAddWorker(wid storiface.WorkerID, tasks map[sealta return true } -func (ps *poStScheduler) delWorker(wid storiface.WorkerID) *workerHandle { +func (ps *poStScheduler) delWorker(wid storiface.WorkerID) *WorkerHandle { ps.lk.Lock() defer ps.lk.Unlock() - var w *workerHandle = nil + var w *WorkerHandle = nil if wh, ok := ps.workers[wid]; ok { w = wh delete(ps.workers, wid) @@ -68,7 +68,7 @@ func (ps *poStScheduler) CanSched(ctx context.Context) bool { } for _, w := range ps.workers { - if w.enabled { + if w.Enabled { return true } } @@ -105,7 +105,7 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg selected := candidates[0] worker := ps.workers[selected.id] - return worker.active.withResources(selected.id, worker.info, selected.res, &ps.lk, func() error { + return worker.active.withResources(selected.id, worker.Info, selected.res, &ps.lk, func() error { ps.lk.Unlock() defer ps.lk.Lock() @@ -122,9 +122,9 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand var accepts []candidateWorker //if the gpus of the worker are insufficient or it's disabled, it cannot be scheduled for wid, wr := range ps.workers { - needRes := wr.info.Resources.ResourceSpec(spt, ps.postType) + needRes := wr.Info.Resources.ResourceSpec(spt, ps.postType) - if !wr.active.canHandleRequest(needRes, wid, "post-readyWorkers", wr.info) { + if !wr.active.CanHandleRequest(needRes, wid, "post-readyWorkers", wr.Info) { continue } @@ -145,16 +145,16 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand func (ps *poStScheduler) disable(wid storiface.WorkerID) { ps.lk.Lock() defer ps.lk.Unlock() - ps.workers[wid].enabled = false + ps.workers[wid].Enabled = false } func (ps *poStScheduler) enable(wid storiface.WorkerID) { ps.lk.Lock() defer ps.lk.Unlock() - ps.workers[wid].enabled = true + ps.workers[wid].Enabled = true } -func (ps *poStScheduler) watch(wid storiface.WorkerID, worker *workerHandle) { +func (ps *poStScheduler) watch(wid storiface.WorkerID, worker *WorkerHandle) { heartbeatTimer := time.NewTicker(stores.HeartbeatInterval) defer heartbeatTimer.Stop() @@ -197,7 +197,7 @@ func (ps *poStScheduler) watch(wid storiface.WorkerID, worker *workerHandle) { } } -func (ps *poStScheduler) workerCleanup(wid storiface.WorkerID, w *workerHandle) { +func (ps *poStScheduler) workerCleanup(wid storiface.WorkerID, w *WorkerHandle) { select { case <-w.closingMgr: default: @@ -223,7 +223,7 @@ func (ps *poStScheduler) schedClose() { } } -func (ps *poStScheduler) WorkerStats(ctx context.Context, cb func(ctx context.Context, wid storiface.WorkerID, worker *workerHandle)) { +func (ps *poStScheduler) WorkerStats(ctx context.Context, cb func(ctx context.Context, wid storiface.WorkerID, worker *WorkerHandle)) { ps.lk.RLock() defer ps.lk.RUnlock() for id, w := range ps.workers { diff --git a/extern/sector-storage/sched_resources.go b/extern/sector-storage/sched_resources.go index 458ca981d..0a840e726 100644 --- a/extern/sector-storage/sched_resources.go +++ b/extern/sector-storage/sched_resources.go @@ -10,7 +10,7 @@ import ( ) func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, r storiface.Resources, locker sync.Locker, cb func() error) error { - for !a.canHandleRequest(r, id, "withResources", wr) { + for !a.CanHandleRequest(r, id, "withResources", wr) { if a.cond == nil { a.cond = sync.NewCond(locker) } @@ -19,7 +19,7 @@ func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.Work a.waiting-- } - a.add(wr.Resources, r) + a.Add(wr.Resources, r) err := cb() @@ -34,7 +34,7 @@ func (a *activeResources) hasWorkWaiting() bool { } // add task resources to activeResources and return utilization difference -func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resources) float64 { +func (a *activeResources) Add(wr storiface.WorkerResources, r storiface.Resources) float64 { startUtil := a.utilization(wr) if r.GPUUtilization > 0 { @@ -60,9 +60,9 @@ func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resourc } } -// canHandleRequest evaluates if the worker has enough available resources to +// CanHandleRequest evaluates if the worker has enough available resources to // handle the request. -func (a *activeResources) canHandleRequest(needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool { +func (a *activeResources) CanHandleRequest(needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool { if info.IgnoreResources { // shortcircuit; if this worker is ignoring resources, it can always handle the request. return true @@ -145,14 +145,14 @@ func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { return max } -func (wh *workerHandle) utilization() float64 { +func (wh *WorkerHandle) Utilization() float64 { wh.lk.Lock() - u := wh.active.utilization(wh.info.Resources) - u += wh.preparing.utilization(wh.info.Resources) + u := wh.active.utilization(wh.Info.Resources) + u += wh.preparing.utilization(wh.Info.Resources) wh.lk.Unlock() wh.wndLk.Lock() for _, window := range wh.activeWindows { - u += window.allocated.utilization(wh.info.Resources) + u += window.Allocated.utilization(wh.Info.Resources) } wh.wndLk.Unlock() @@ -161,7 +161,7 @@ func (wh *workerHandle) utilization() float64 { var tasksCacheTimeout = 30 * time.Second -func (wh *workerHandle) TaskTypes(ctx context.Context) (t map[sealtasks.TaskType]struct{}, err error) { +func (wh *WorkerHandle) TaskTypes(ctx context.Context) (t map[sealtasks.TaskType]struct{}, err error) { wh.tasksLk.Lock() defer wh.tasksLk.Unlock() diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 77e674793..046b32d1a 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -183,7 +183,7 @@ func (s *schedTestWorker) Close() error { var _ Worker = &schedTestWorker{} -func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) { +func addTestWorker(t *testing.T, sched *Scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) { w := &schedTestWorker{ name: name, taskTypes: taskTypes, @@ -259,13 +259,13 @@ func TestSched(t *testing.T) { wg sync.WaitGroup } - type task func(*testing.T, *scheduler, *stores.Index, *runMeta) + type task func(*testing.T, *Scheduler, *stores.Index, *runMeta) sched := func(taskName, expectWorker string, sid abi.SectorNumber, taskType sealtasks.TaskType) task { _, _, l, _ := runtime.Caller(1) _, _, l2, _ := runtime.Caller(2) - return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { + return func(t *testing.T, sched *Scheduler, index *stores.Index, rm *runMeta) { done := make(chan struct{}) rm.done[taskName] = done @@ -314,7 +314,7 @@ func TestSched(t *testing.T) { taskStarted := func(name string) task { _, _, l, _ := runtime.Caller(1) _, _, l2, _ := runtime.Caller(2) - return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { + return func(t *testing.T, sched *Scheduler, index *stores.Index, rm *runMeta) { select { case rm.done[name] <- struct{}{}: case <-ctx.Done(): @@ -326,7 +326,7 @@ func TestSched(t *testing.T) { taskDone := func(name string) task { _, _, l, _ := runtime.Caller(1) _, _, l2, _ := runtime.Caller(2) - return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { + return func(t *testing.T, sched *Scheduler, index *stores.Index, rm *runMeta) { select { case rm.done[name] <- struct{}{}: case <-ctx.Done(): @@ -339,7 +339,7 @@ func TestSched(t *testing.T) { taskNotScheduled := func(name string) task { _, _, l, _ := runtime.Caller(1) _, _, l2, _ := runtime.Caller(2) - return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { + return func(t *testing.T, sched *Scheduler, index *stores.Index, rm *runMeta) { select { case rm.done[name] <- struct{}{}: t.Fatal("not expected", l, l2) @@ -378,7 +378,7 @@ func TestSched(t *testing.T) { } multTask := func(tasks ...task) task { - return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) { + return func(t *testing.T, s *Scheduler, index *stores.Index, meta *runMeta) { for _, tsk := range tasks { tsk(t, s, index, meta) } @@ -492,7 +492,7 @@ func TestSched(t *testing.T) { } diag := func() task { - return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) { + return func(t *testing.T, s *Scheduler, index *stores.Index, meta *runMeta) { time.Sleep(20 * time.Millisecond) for _, request := range s.diag().Requests { log.Infof("!!! sDIAG: sid(%d) task(%s)", request.Sector.Number, request.TaskType) @@ -582,12 +582,12 @@ func TestSched(t *testing.T) { type slowishSelector bool -func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *workerHandle) (bool, error) { +func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) { time.Sleep(200 * time.Microsecond) return bool(s), nil } -func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) { +func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { time.Sleep(100 * time.Microsecond) return true, nil } @@ -605,9 +605,9 @@ func BenchmarkTrySched(b *testing.B) { b.StopTimer() sched := newScheduler() - sched.workers[storiface.WorkerID{}] = &workerHandle{ + sched.Workers[storiface.WorkerID{}] = &WorkerHandle{ workerRpc: nil, - info: storiface.WorkerInfo{ + Info: storiface.WorkerInfo{ Hostname: "t", Resources: decentWorkerResources, }, @@ -616,17 +616,17 @@ func BenchmarkTrySched(b *testing.B) { } for i := 0; i < windows; i++ { - sched.openWindows = append(sched.openWindows, &schedWindowRequest{ - worker: storiface.WorkerID{}, - done: make(chan *schedWindow, 1000), + sched.OpenWindows = append(sched.OpenWindows, &SchedWindowRequest{ + Worker: storiface.WorkerID{}, + Done: make(chan *SchedWindow, 1000), }) } for i := 0; i < queue; i++ { - sched.schedQueue.Push(&workerRequest{ - taskType: sealtasks.TTCommit2, - sel: slowishSelector(true), - ctx: ctx, + sched.SchedQueue.Push(&WorkerRequest{ + TaskType: sealtasks.TTCommit2, + Sel: slowishSelector(true), + Ctx: ctx, }) } @@ -644,26 +644,26 @@ func BenchmarkTrySched(b *testing.B) { } func TestWindowCompact(t *testing.T) { - sh := scheduler{} + sh := Scheduler{} spt := abi.RegisteredSealProof_StackedDrg32GiBV1 test := func(start [][]sealtasks.TaskType, expect [][]sealtasks.TaskType) func(t *testing.T) { return func(t *testing.T) { - wh := &workerHandle{ - info: storiface.WorkerInfo{ + wh := &WorkerHandle{ + Info: storiface.WorkerInfo{ Resources: decentWorkerResources, }, } for _, windowTasks := range start { - window := &schedWindow{} + window := &SchedWindow{} for _, task := range windowTasks { - window.todo = append(window.todo, &workerRequest{ - taskType: task, - sector: storage.SectorRef{ProofType: spt}, + window.Todo = append(window.Todo, &WorkerRequest{ + TaskType: task, + Sector: storage.SectorRef{ProofType: spt}, }) - window.allocated.add(wh.info.Resources, storiface.ResourceTable[task][spt]) + window.Allocated.Add(wh.Info.Resources, storiface.ResourceTable[task][spt]) } wh.activeWindows = append(wh.activeWindows, window) @@ -681,14 +681,14 @@ func TestWindowCompact(t *testing.T) { var expectRes activeResources for ti, task := range tasks { - require.Equal(t, task, wh.activeWindows[wi].todo[ti].taskType, "%d, %d", wi, ti) - expectRes.add(wh.info.Resources, storiface.ResourceTable[task][spt]) + require.Equal(t, task, wh.activeWindows[wi].Todo[ti].TaskType, "%d, %d", wi, ti) + expectRes.Add(wh.Info.Resources, storiface.ResourceTable[task][spt]) } - require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].allocated.cpuUse, "%d", wi) - require.Equal(t, expectRes.gpuUsed, wh.activeWindows[wi].allocated.gpuUsed, "%d", wi) - require.Equal(t, expectRes.memUsedMin, wh.activeWindows[wi].allocated.memUsedMin, "%d", wi) - require.Equal(t, expectRes.memUsedMax, wh.activeWindows[wi].allocated.memUsedMax, "%d", wi) + require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].Allocated.cpuUse, "%d", wi) + require.Equal(t, expectRes.gpuUsed, wh.activeWindows[wi].Allocated.gpuUsed, "%d", wi) + require.Equal(t, expectRes.memUsedMin, wh.activeWindows[wi].Allocated.memUsedMin, "%d", wi) + require.Equal(t, expectRes.memUsedMax, wh.activeWindows[wi].Allocated.memUsedMax, "%d", wi) } } diff --git a/extern/sector-storage/sched_worker.go b/extern/sector-storage/sched_worker.go index f0a85ea3f..1bc437d4a 100644 --- a/extern/sector-storage/sched_worker.go +++ b/extern/sector-storage/sched_worker.go @@ -12,31 +12,31 @@ import ( ) type schedWorker struct { - sched *scheduler - worker *workerHandle + sched *Scheduler + worker *WorkerHandle wid storiface.WorkerID heartbeatTimer *time.Ticker - scheduledWindows chan *schedWindow + scheduledWindows chan *SchedWindow taskDone chan struct{} windowsRequested int } -func newWorkerHandle(ctx context.Context, w Worker) (*workerHandle, error) { +func newWorkerHandle(ctx context.Context, w Worker) (*WorkerHandle, error) { info, err := w.Info(ctx) if err != nil { return nil, xerrors.Errorf("getting worker info: %w", err) } - worker := &workerHandle{ + worker := &WorkerHandle{ workerRpc: w, - info: info, + Info: info, preparing: &activeResources{}, active: &activeResources{}, - enabled: true, + Enabled: true, closingMgr: make(chan struct{}), closedMgr: make(chan struct{}), @@ -46,9 +46,9 @@ func newWorkerHandle(ctx context.Context, w Worker) (*workerHandle, error) { } // context only used for startup -func (sh *scheduler) runWorker(ctx context.Context, wid storiface.WorkerID, worker *workerHandle) error { +func (sh *Scheduler) runWorker(ctx context.Context, wid storiface.WorkerID, worker *WorkerHandle) error { sh.workersLk.Lock() - _, exist := sh.workers[wid] + _, exist := sh.Workers[wid] if exist { log.Warnw("duplicated worker added", "id", wid) @@ -57,7 +57,7 @@ func (sh *scheduler) runWorker(ctx context.Context, wid storiface.WorkerID, work return nil } - sh.workers[wid] = worker + sh.Workers[wid] = worker sh.workersLk.Unlock() sw := &schedWorker{ @@ -67,7 +67,7 @@ func (sh *scheduler) runWorker(ctx context.Context, wid storiface.WorkerID, work wid: wid, heartbeatTimer: time.NewTicker(stores.HeartbeatInterval), - scheduledWindows: make(chan *schedWindow, SchedWindows), + scheduledWindows: make(chan *SchedWindow, SchedWindows), taskDone: make(chan struct{}, 1), windowsRequested: 0, @@ -94,7 +94,7 @@ func (sw *schedWorker) handleWorker() { } sched.workersLk.Lock() - delete(sched.workers, sw.wid) + delete(sched.Workers, sw.wid) sched.workersLk.Unlock() }() @@ -103,7 +103,7 @@ func (sw *schedWorker) handleWorker() { for { { sched.workersLk.Lock() - enabled := worker.enabled + enabled := worker.Enabled sched.workersLk.Unlock() // ask for more windows if we need them (non-blocking) @@ -124,8 +124,8 @@ func (sw *schedWorker) handleWorker() { // session looks good { sched.workersLk.Lock() - enabled := worker.enabled - worker.enabled = true + enabled := worker.Enabled + worker.Enabled = true sched.workersLk.Unlock() if !enabled { @@ -248,9 +248,9 @@ func (sw *schedWorker) checkSession(ctx context.Context) bool { func (sw *schedWorker) requestWindows() bool { for ; sw.windowsRequested < SchedWindows; sw.windowsRequested++ { select { - case sw.sched.windowRequests <- &schedWindowRequest{ - worker: sw.wid, - done: sw.scheduledWindows, + case sw.sched.windowRequests <- &SchedWindowRequest{ + Worker: sw.wid, + Done: sw.scheduledWindows, }: case <-sw.sched.closing: return false @@ -290,21 +290,21 @@ func (sw *schedWorker) workerCompactWindows() { lower := worker.activeWindows[wi] var moved []int - for ti, todo := range window.todo { - needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType) - if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info) { + for ti, todo := range window.Todo { + needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType) + if !lower.Allocated.CanHandleRequest(needRes, sw.wid, "compactWindows", worker.Info) { continue } moved = append(moved, ti) - lower.todo = append(lower.todo, todo) - lower.allocated.add(worker.info.Resources, needRes) - window.allocated.free(worker.info.Resources, needRes) + lower.Todo = append(lower.Todo, todo) + lower.Allocated.Add(worker.Info.Resources, needRes) + window.Allocated.free(worker.Info.Resources, needRes) } if len(moved) > 0 { - newTodo := make([]*workerRequest, 0, len(window.todo)-len(moved)) - for i, t := range window.todo { + newTodo := make([]*WorkerRequest, 0, len(window.Todo)-len(moved)) + for i, t := range window.Todo { if len(moved) > 0 && moved[0] == i { moved = moved[1:] continue @@ -312,16 +312,16 @@ func (sw *schedWorker) workerCompactWindows() { newTodo = append(newTodo, t) } - window.todo = newTodo + window.Todo = newTodo } } } var compacted int - var newWindows []*schedWindow + var newWindows []*SchedWindow for _, window := range worker.activeWindows { - if len(window.todo) == 0 { + if len(window.Todo) == 0 { compacted++ continue } @@ -347,13 +347,13 @@ assignLoop: firstWindow := worker.activeWindows[0] // process tasks within a window, preferring tasks at lower indexes - for len(firstWindow.todo) > 0 { + for len(firstWindow.Todo) > 0 { tidx := -1 worker.lk.Lock() - for t, todo := range firstWindow.todo { - needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType) - if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) { + for t, todo := range firstWindow.Todo { + needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType) + if worker.preparing.CanHandleRequest(needRes, sw.wid, "startPreparing", worker.Info) { tidx = t break } @@ -364,9 +364,9 @@ assignLoop: break assignLoop } - todo := firstWindow.todo[tidx] + todo := firstWindow.Todo[tidx] - log.Debugf("assign worker sector %d", todo.sector.ID.Number) + log.Debugf("assign worker sector %d", todo.Sector.ID.Number) err := sw.startProcessingTask(todo) if err != nil { @@ -375,9 +375,9 @@ assignLoop: } // Note: we're not freeing window.allocated resources here very much on purpose - copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:]) - firstWindow.todo[len(firstWindow.todo)-1] = nil - firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1] + copy(firstWindow.Todo[tidx:], firstWindow.Todo[tidx+1:]) + firstWindow.Todo[len(firstWindow.Todo)-1] = nil + firstWindow.Todo = firstWindow.Todo[:len(firstWindow.Todo)-1] } copy(worker.activeWindows, worker.activeWindows[1:]) @@ -405,16 +405,16 @@ assignLoop: firstWindow := worker.activeWindows[0] // process tasks within a window, preferring tasks at lower indexes - for len(firstWindow.todo) > 0 { + for len(firstWindow.Todo) > 0 { tidx := -1 - for t, todo := range firstWindow.todo { - if todo.taskType != sealtasks.TTCommit1 && todo.taskType != sealtasks.TTCommit2 { // todo put in task + for t, todo := range firstWindow.Todo { + if todo.TaskType != sealtasks.TTCommit1 && todo.TaskType != sealtasks.TTCommit2 { // todo put in task continue } - needRes := storiface.ResourceTable[todo.taskType][todo.sector.ProofType] - if worker.active.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) { + needRes := storiface.ResourceTable[todo.TaskType][todo.Sector.ProofType] + if worker.active.CanHandleRequest(needRes, sw.wid, "startPreparing", worker.Info) { tidx = t break } @@ -424,9 +424,9 @@ assignLoop: break assignLoop } - todo := firstWindow.todo[tidx] + todo := firstWindow.Todo[tidx] - log.Debugf("assign worker sector %d (ready)", todo.sector.ID.Number) + log.Debugf("assign worker sector %d (ready)", todo.Sector.ID.Number) err := sw.startProcessingReadyTask(todo) if err != nil { @@ -435,9 +435,9 @@ assignLoop: } // Note: we're not freeing window.allocated resources here very much on purpose - copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:]) - firstWindow.todo[len(firstWindow.todo)-1] = nil - firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1] + copy(firstWindow.Todo[tidx:], firstWindow.Todo[tidx+1:]) + firstWindow.Todo[len(firstWindow.Todo)-1] = nil + firstWindow.Todo = firstWindow.Todo[:len(firstWindow.Todo)-1] } copy(worker.activeWindows, worker.activeWindows[1:]) @@ -448,24 +448,24 @@ assignLoop: } } -func (sw *schedWorker) startProcessingTask(req *workerRequest) error { +func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error { w, sh := sw.worker, sw.sched - needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType) + needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType) w.lk.Lock() - w.preparing.add(w.info.Resources, needRes) + w.preparing.Add(w.Info.Resources, needRes) w.lk.Unlock() go func() { // first run the prepare step (e.g. fetching sector data from other worker) - tw := sh.workTracker.worker(sw.wid, w.info, w.workerRpc) + tw := sh.workTracker.worker(sw.wid, w.Info, w.workerRpc) tw.start() - err := req.prepare(req.ctx, tw) + err := req.prepare(req.Ctx, tw) w.lk.Lock() if err != nil { - w.preparing.free(w.info.Resources, needRes) + w.preparing.free(w.Info.Resources, needRes) w.lk.Unlock() select { @@ -477,7 +477,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { select { case req.ret <- workerResponse{err: err}: - case <-req.ctx.Done(): + case <-req.Ctx.Done(): log.Warnf("request got cancelled before we could respond (prepare error: %+v)", err) case <-sh.closing: log.Warnf("scheduler closed while sending response (prepare error: %+v)", err) @@ -485,17 +485,17 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { return } - tw = sh.workTracker.worker(sw.wid, w.info, w.workerRpc) + tw = sh.workTracker.worker(sw.wid, w.Info, w.workerRpc) // start tracking work first early in case we need to wait for resources werr := make(chan error, 1) go func() { - werr <- req.work(req.ctx, tw) + werr <- req.work(req.Ctx, tw) }() // wait (if needed) for resources in the 'active' window - err = w.active.withResources(sw.wid, w.info, needRes, &w.lk, func() error { - w.preparing.free(w.info.Resources, needRes) + err = w.active.withResources(sw.wid, w.Info, needRes, &w.lk, func() error { + w.preparing.free(w.Info.Resources, needRes) w.lk.Unlock() defer w.lk.Lock() // we MUST return locked from this function @@ -511,7 +511,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { select { case req.ret <- workerResponse{err: err}: - case <-req.ctx.Done(): + case <-req.Ctx.Done(): log.Warnf("request got cancelled before we could respond") case <-sh.closing: log.Warnf("scheduler closed while sending response") @@ -531,22 +531,22 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { return nil } -func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error { +func (sw *schedWorker) startProcessingReadyTask(req *WorkerRequest) error { w, sh := sw.worker, sw.sched - needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType) + needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType) - w.active.add(w.info.Resources, needRes) + w.active.Add(w.Info.Resources, needRes) go func() { // Do the work! - tw := sh.workTracker.worker(sw.wid, w.info, w.workerRpc) + tw := sh.workTracker.worker(sw.wid, w.Info, w.workerRpc) tw.start() - err := req.work(req.ctx, tw) + err := req.work(req.Ctx, tw) select { case req.ret <- workerResponse{err: err}: - case <-req.ctx.Done(): + case <-req.Ctx.Done(): log.Warnf("request got cancelled before we could respond") case <-sh.closing: log.Warnf("scheduler closed while sending response") @@ -554,7 +554,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error { w.lk.Lock() - w.active.free(w.info.Resources, needRes) + w.active.free(w.Info.Resources, needRes) select { case sw.taskDone <- struct{}{}: @@ -574,7 +574,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error { return nil } -func (sh *scheduler) workerCleanup(wid storiface.WorkerID, w *workerHandle) { +func (sh *Scheduler) workerCleanup(wid storiface.WorkerID, w *WorkerHandle) { select { case <-w.closingMgr: default: @@ -592,13 +592,13 @@ func (sh *scheduler) workerCleanup(wid storiface.WorkerID, w *workerHandle) { if !w.cleanupStarted { w.cleanupStarted = true - newWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)) - for _, window := range sh.openWindows { - if window.worker != wid { + newWindows := make([]*SchedWindowRequest, 0, len(sh.OpenWindows)) + for _, window := range sh.OpenWindows { + if window.Worker != wid { newWindows = append(newWindows, window) } } - sh.openWindows = newWindows + sh.OpenWindows = newWindows log.Debugf("worker %s dropped", wid) } diff --git a/extern/sector-storage/selector_alloc.go b/extern/sector-storage/selector_alloc.go index fe6a11e1e..d2a49f3ce 100644 --- a/extern/sector-storage/selector_alloc.go +++ b/extern/sector-storage/selector_alloc.go @@ -26,7 +26,7 @@ func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType, } } -func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { +func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) { tasks, err := whnd.TaskTypes(ctx) if err != nil { return false, xerrors.Errorf("getting supported worker task types: %w", err) @@ -64,8 +64,8 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi return false, nil } -func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) { - return a.utilization() < b.utilization(), nil +func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { + return a.Utilization() < b.Utilization(), nil } var _ WorkerSelector = &allocSelector{} diff --git a/extern/sector-storage/selector_existing.go b/extern/sector-storage/selector_existing.go index b84991b5c..0703f22ed 100644 --- a/extern/sector-storage/selector_existing.go +++ b/extern/sector-storage/selector_existing.go @@ -28,7 +28,7 @@ func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc st } } -func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { +func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) { tasks, err := whnd.TaskTypes(ctx) if err != nil { return false, xerrors.Errorf("getting supported worker task types: %w", err) @@ -66,8 +66,8 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt return false, nil } -func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) { - return a.utilization() < b.utilization(), nil +func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { + return a.Utilization() < b.Utilization(), nil } var _ WorkerSelector = &existingSelector{} diff --git a/extern/sector-storage/selector_task.go b/extern/sector-storage/selector_task.go index 94bcb4419..1be2c3677 100644 --- a/extern/sector-storage/selector_task.go +++ b/extern/sector-storage/selector_task.go @@ -19,7 +19,7 @@ func newTaskSelector() *taskSelector { return &taskSelector{} } -func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { +func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) { tasks, err := whnd.TaskTypes(ctx) if err != nil { return false, xerrors.Errorf("getting supported worker task types: %w", err) @@ -29,7 +29,7 @@ func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi. return supported, nil } -func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *workerHandle) (bool, error) { +func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { atasks, err := a.TaskTypes(ctx) if err != nil { return false, xerrors.Errorf("getting supported worker task types: %w", err) @@ -43,7 +43,7 @@ func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *work return len(atasks) < len(btasks), nil // prefer workers which can do less } - return a.utilization() < b.utilization(), nil + return a.Utilization() < b.Utilization(), nil } var _ WorkerSelector = &taskSelector{} diff --git a/extern/sector-storage/stats.go b/extern/sector-storage/stats.go index 9b374f328..67a99710b 100644 --- a/extern/sector-storage/stats.go +++ b/extern/sector-storage/stats.go @@ -15,7 +15,7 @@ func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.Worke out := map[uuid.UUID]storiface.WorkerStats{} - cb := func(ctx context.Context, id storiface.WorkerID, handle *workerHandle) { + cb := func(ctx context.Context, id storiface.WorkerID, handle *WorkerHandle) { handle.lk.Lock() ctx, cancel := context.WithTimeout(ctx, 3*time.Second) @@ -32,9 +32,9 @@ func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.Worke } out[uuid.UUID(id)] = storiface.WorkerStats{ - Info: handle.info, + Info: handle.Info, Tasks: taskList, - Enabled: handle.enabled, + Enabled: handle.Enabled, MemUsedMin: handle.active.memUsedMin, MemUsedMax: handle.active.memUsedMax, GpuUsed: handle.active.gpuUsed, @@ -43,7 +43,7 @@ func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.Worke handle.lk.Unlock() } - for id, handle := range m.sched.workers { + for id, handle := range m.sched.Workers { cb(ctx, id, handle) } @@ -72,14 +72,14 @@ func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob { m.sched.workersLk.RLock() - for id, handle := range m.sched.workers { + for id, handle := range m.sched.Workers { handle.wndLk.Lock() for wi, window := range handle.activeWindows { - for _, request := range window.todo { + for _, request := range window.Todo { out[uuid.UUID(id)] = append(out[uuid.UUID(id)], storiface.WorkerJob{ ID: storiface.UndefCall, - Sector: request.sector.ID, - Task: request.taskType, + Sector: request.Sector.ID, + Task: request.TaskType, RunWait: wi + 2, Start: request.start, }) From 588b8ecbcaaf3a1469e2834465d5715f5cdc8860 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 18 May 2022 16:11:07 +0200 Subject: [PATCH 2/9] sched: Separate WindowSelector func --- extern/sector-storage/sched.go | 2 +- .../sector-storage/sched_assigner_common.go | 176 ++++++++++++++++++ extern/sector-storage/sched_assigner_util.go | 167 +---------------- 3 files changed, 183 insertions(+), 162 deletions(-) create mode 100644 extern/sector-storage/sched_assigner_common.go diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 8d190698d..a964e04bb 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -150,7 +150,7 @@ type workerResponse struct { func newScheduler() *Scheduler { return &Scheduler{ - assigner: &AssignerUtil{}, + assigner: NewLowestUtilizationAssigner(), Workers: map[storiface.WorkerID]*WorkerHandle{}, diff --git a/extern/sector-storage/sched_assigner_common.go b/extern/sector-storage/sched_assigner_common.go new file mode 100644 index 000000000..a4a2dfc23 --- /dev/null +++ b/extern/sector-storage/sched_assigner_common.go @@ -0,0 +1,176 @@ +package sectorstorage + +import ( + "context" + "math/rand" + "sort" + "sync" +) + +type WindowSelector func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int + +// AssignerCommon is a task assigner with customizable parts +type AssignerCommon struct { + WindowSel WindowSelector +} + +var _ Assigner = &AssignerCommon{} + +func (a *AssignerCommon) TrySched(sh *Scheduler) { + /* + This assigns tasks to workers based on: + - Task priority (achieved by handling sh.SchedQueue in order, since it's already sorted by priority) + - Worker resource availability + - Task-specified worker preference (acceptableWindows array below sorted by this preference) + - Window request age + + 1. For each task in the SchedQueue find windows which can handle them + 1.1. Create list of windows capable of handling a task + 1.2. Sort windows according to task selector preferences + 2. Going through SchedQueue again, assign task to first acceptable window + with resources available + 3. Submit windows with scheduled tasks to workers + + */ + + windowsLen := len(sh.OpenWindows) + queueLen := sh.SchedQueue.Len() + + log.Debugf("SCHED %d queued; %d open windows", queueLen, windowsLen) + + if windowsLen == 0 || queueLen == 0 { + // nothing to schedule on + return + } + + windows := make([]SchedWindow, windowsLen) + acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex + + // Step 1 + throttle := make(chan struct{}, windowsLen) + + var wg sync.WaitGroup + wg.Add(queueLen) + for i := 0; i < queueLen; i++ { + throttle <- struct{}{} + + go func(sqi int) { + defer wg.Done() + defer func() { + <-throttle + }() + + task := (*sh.SchedQueue)[sqi] + + task.IndexHeap = sqi + for wnd, windowRequest := range sh.OpenWindows { + worker, ok := sh.Workers[windowRequest.Worker] + if !ok { + log.Errorf("worker referenced by windowRequest not found (worker: %s)", windowRequest.Worker) + // TODO: How to move forward here? + continue + } + + if !worker.Enabled { + log.Debugw("skipping disabled worker", "worker", windowRequest.Worker) + continue + } + + needRes := worker.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) + + // TODO: allow bigger windows + if !windows[wnd].Allocated.CanHandleRequest(needRes, windowRequest.Worker, "schedAcceptable", worker.Info) { + continue + } + + rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout) + ok, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker) + cancel() + if err != nil { + log.Errorf("trySched(1) req.Sel.Ok error: %+v", err) + continue + } + + if !ok { + continue + } + + acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd) + } + + if len(acceptableWindows[sqi]) == 0 { + return + } + + // Pick best worker (shuffle in case some workers are equally as good) + rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) { + acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i] // nolint:scopelint + }) + sort.SliceStable(acceptableWindows[sqi], func(i, j int) bool { + wii := sh.OpenWindows[acceptableWindows[sqi][i]].Worker // nolint:scopelint + wji := sh.OpenWindows[acceptableWindows[sqi][j]].Worker // nolint:scopelint + + if wii == wji { + // for the same worker prefer older windows + return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint + } + + wi := sh.Workers[wii] + wj := sh.Workers[wji] + + rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout) + defer cancel() + + r, err := task.Sel.Cmp(rpcCtx, task.TaskType, wi, wj) + if err != nil { + log.Errorf("selecting best worker: %s", err) + } + return r + }) + }(i) + } + + wg.Wait() + + log.Debugf("SCHED windows: %+v", windows) + log.Debugf("SCHED Acceptable win: %+v", acceptableWindows) + + // Step 2 + scheduled := a.WindowSel(sh, queueLen, acceptableWindows, windows) + + // Step 3 + + if scheduled == 0 { + return + } + + scheduledWindows := map[int]struct{}{} + for wnd, window := range windows { + if len(window.Todo) == 0 { + // Nothing scheduled here, keep the window open + continue + } + + scheduledWindows[wnd] = struct{}{} + + window := window // copy + select { + case sh.OpenWindows[wnd].Done <- &window: + default: + log.Error("expected sh.OpenWindows[wnd].Done to be buffered") + } + } + + // Rewrite sh.OpenWindows array, removing scheduled windows + newOpenWindows := make([]*SchedWindowRequest, 0, windowsLen-len(scheduledWindows)) + for wnd, window := range sh.OpenWindows { + if _, scheduled := scheduledWindows[wnd]; scheduled { + // keep unscheduled windows open + continue + } + + newOpenWindows = append(newOpenWindows, window) + } + + sh.OpenWindows = newOpenWindows +} diff --git a/extern/sector-storage/sched_assigner_util.go b/extern/sector-storage/sched_assigner_util.go index a75ceb363..48f19931e 100644 --- a/extern/sector-storage/sched_assigner_util.go +++ b/extern/sector-storage/sched_assigner_util.go @@ -1,139 +1,18 @@ package sectorstorage import ( - "context" "math" - "math/rand" - "sort" - "sync" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) -// AssignerUtil is a task assigner assigning tasks to workers with lowest utilization -type AssignerUtil struct{} - -var _ Assigner = &AssignerUtil{} - -func (a *AssignerUtil) TrySched(sh *Scheduler) { - /* - This assigns tasks to workers based on: - - Task priority (achieved by handling sh.SchedQueue in order, since it's already sorted by priority) - - Worker resource availability - - Task-specified worker preference (acceptableWindows array below sorted by this preference) - - Window request age - - 1. For each task in the SchedQueue find windows which can handle them - 1.1. Create list of windows capable of handling a task - 1.2. Sort windows according to task selector preferences - 2. Going through SchedQueue again, assign task to first acceptable window - with resources available - 3. Submit windows with scheduled tasks to workers - - */ - - windowsLen := len(sh.OpenWindows) - queueLen := sh.SchedQueue.Len() - - log.Debugf("SCHED %d queued; %d open windows", queueLen, windowsLen) - - if windowsLen == 0 || queueLen == 0 { - // nothing to schedule on - return +func NewLowestUtilizationAssigner() Assigner { + return &AssignerCommon{ + WindowSel: LowestUtilizationWS, } +} - windows := make([]SchedWindow, windowsLen) - acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex - - // Step 1 - throttle := make(chan struct{}, windowsLen) - - var wg sync.WaitGroup - wg.Add(queueLen) - for i := 0; i < queueLen; i++ { - throttle <- struct{}{} - - go func(sqi int) { - defer wg.Done() - defer func() { - <-throttle - }() - - task := (*sh.SchedQueue)[sqi] - - task.IndexHeap = sqi - for wnd, windowRequest := range sh.OpenWindows { - worker, ok := sh.Workers[windowRequest.Worker] - if !ok { - log.Errorf("worker referenced by windowRequest not found (worker: %s)", windowRequest.Worker) - // TODO: How to move forward here? - continue - } - - if !worker.Enabled { - log.Debugw("skipping disabled worker", "worker", windowRequest.Worker) - continue - } - - needRes := worker.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) - - // TODO: allow bigger windows - if !windows[wnd].Allocated.CanHandleRequest(needRes, windowRequest.Worker, "schedAcceptable", worker.Info) { - continue - } - - rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout) - ok, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker) - cancel() - if err != nil { - log.Errorf("trySched(1) req.Sel.Ok error: %+v", err) - continue - } - - if !ok { - continue - } - - acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd) - } - - if len(acceptableWindows[sqi]) == 0 { - return - } - - // Pick best worker (shuffle in case some workers are equally as good) - rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) { - acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i] // nolint:scopelint - }) - sort.SliceStable(acceptableWindows[sqi], func(i, j int) bool { - wii := sh.OpenWindows[acceptableWindows[sqi][i]].Worker // nolint:scopelint - wji := sh.OpenWindows[acceptableWindows[sqi][j]].Worker // nolint:scopelint - - if wii == wji { - // for the same worker prefer older windows - return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint - } - - wi := sh.Workers[wii] - wj := sh.Workers[wji] - - rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout) - defer cancel() - - r, err := task.Sel.Cmp(rpcCtx, task.TaskType, wi, wj) - if err != nil { - log.Errorf("selecting best worker: %s", err) - } - return r - }) - }(i) - } - - wg.Wait() - - log.Debugf("SCHED windows: %+v", windows) - log.Debugf("SCHED Acceptable win: %+v", acceptableWindows) - +func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int { // Step 2 scheduled := 0 rmQueue := make([]int, 0, queueLen) @@ -216,39 +95,5 @@ func (a *AssignerUtil) TrySched(sh *Scheduler) { } } - // Step 3 - - if scheduled == 0 { - return - } - - scheduledWindows := map[int]struct{}{} - for wnd, window := range windows { - if len(window.Todo) == 0 { - // Nothing scheduled here, keep the window open - continue - } - - scheduledWindows[wnd] = struct{}{} - - window := window // copy - select { - case sh.OpenWindows[wnd].Done <- &window: - default: - log.Error("expected sh.OpenWindows[wnd].Done to be buffered") - } - } - - // Rewrite sh.OpenWindows array, removing scheduled windows - newOpenWindows := make([]*SchedWindowRequest, 0, windowsLen-len(scheduledWindows)) - for wnd, window := range sh.OpenWindows { - if _, scheduled := scheduledWindows[wnd]; scheduled { - // keep unscheduled windows open - continue - } - - newOpenWindows = append(newOpenWindows, window) - } - - sh.OpenWindows = newOpenWindows + return scheduled } From 5ba8bd3b9924d90e9fde42eb6b3a2867758789db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 23 May 2022 16:58:43 +0200 Subject: [PATCH 3/9] sched: Configurable assigners --- extern/sector-storage/manager.go | 9 ++- extern/sector-storage/manager_test.go | 5 +- extern/sector-storage/sched.go | 16 +++- .../sector-storage/sched_assigner_spread.go | 77 +++++++++++++++++++ ..._util.go => sched_assigner_utilization.go} | 3 +- extern/sector-storage/sched_test.go | 9 ++- node/config/def.go | 2 + node/config/types.go | 5 ++ 8 files changed, 116 insertions(+), 10 deletions(-) create mode 100644 extern/sector-storage/sched_assigner_spread.go rename extern/sector-storage/{sched_assigner_util.go => sched_assigner_utilization.go} (96%) diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index f75bdd107..01600cc21 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -122,6 +122,8 @@ type Config struct { // PoSt config ParallelCheckLimit int + + Assigner string } type StorageAuth http.Header @@ -135,6 +137,11 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores. return nil, xerrors.Errorf("creating prover instance: %w", err) } + sh, err := newScheduler(sc.Assigner) + if err != nil { + return nil, err + } + m := &Manager{ ls: ls, storage: stor, @@ -142,7 +149,7 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores. remoteHnd: &stores.FetchHandler{Local: lstor, PfHandler: &stores.DefaultPartialFileHandler{}}, index: si, - sched: newScheduler(), + sched: sh, windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt), winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt), diff --git a/extern/sector-storage/manager_test.go b/extern/sector-storage/manager_test.go index 9c844292e..a91f372f7 100644 --- a/extern/sector-storage/manager_test.go +++ b/extern/sector-storage/manager_test.go @@ -109,6 +109,9 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man stor := stores.NewRemote(lstor, si, nil, 6000, &stores.DefaultPartialFileHandler{}) + sh, err := newScheduler("") + require.NoError(t, err) + m := &Manager{ ls: st, storage: stor, @@ -116,7 +119,7 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man remoteHnd: &stores.FetchHandler{Local: lstor}, index: si, - sched: newScheduler(), + sched: sh, windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt), winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt), diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index a964e04bb..53b6415ff 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -148,9 +148,19 @@ type workerResponse struct { err error } -func newScheduler() *Scheduler { +func newScheduler(assigner string) (*Scheduler, error) { + var a Assigner + switch assigner { + case "", "utilization": + a = NewLowestUtilizationAssigner() + case "spread": + a = NewSpreadAssigner() + default: + return nil, xerrors.Errorf("unknown assigner '%s'", assigner) + } + return &Scheduler{ - assigner: NewLowestUtilizationAssigner(), + assigner: a, Workers: map[storiface.WorkerID]*WorkerHandle{}, @@ -171,7 +181,7 @@ func newScheduler() *Scheduler { closing: make(chan struct{}), closed: make(chan struct{}), - } + }, nil } func (sh *Scheduler) Schedule(ctx context.Context, sector storage.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error { diff --git a/extern/sector-storage/sched_assigner_spread.go b/extern/sector-storage/sched_assigner_spread.go new file mode 100644 index 000000000..2623c9c06 --- /dev/null +++ b/extern/sector-storage/sched_assigner_spread.go @@ -0,0 +1,77 @@ +package sectorstorage + +import ( + "math" + + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" +) + +func NewSpreadAssigner() Assigner { + return &AssignerCommon{ + WindowSel: SpreadWS, + } +} + +func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int { + scheduled := 0 + rmQueue := make([]int, 0, queueLen) + workerAssigned := map[storiface.WorkerID]int{} + for sqi := 0; sqi < queueLen; sqi++ { + task := (*sh.SchedQueue)[sqi] + + selectedWindow := -1 + var info storiface.WorkerInfo + var bestWid storiface.WorkerID + bestAssigned := math.MaxInt // smaller = better + + for i, wnd := range acceptableWindows[task.IndexHeap] { + wid := sh.OpenWindows[wnd].Worker + w := sh.Workers[wid] + + res := info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) + + log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) + + if !windows[wnd].Allocated.CanHandleRequest(res, wid, "schedAssign", info) { + continue + } + + wu, _ := workerAssigned[wid] + if wu >= bestAssigned { + continue + } + + info = w.Info + bestWid = wid + selectedWindow = wnd + bestAssigned = wu + } + + if selectedWindow < 0 { + // all windows full + continue + } + + log.Debugw("SCHED ASSIGNED", + "sqi", sqi, + "sector", task.Sector.ID.Number, + "task", task.TaskType, + "window", selectedWindow, + "worker", bestWid, + "assigned", bestAssigned) + + workerAssigned[bestWid] += 1 + windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) + + rmQueue = append(rmQueue, sqi) + scheduled++ + } + + if len(rmQueue) > 0 { + for i := len(rmQueue) - 1; i >= 0; i-- { + sh.SchedQueue.Remove(rmQueue[i]) + } + } + + return scheduled +} diff --git a/extern/sector-storage/sched_assigner_util.go b/extern/sector-storage/sched_assigner_utilization.go similarity index 96% rename from extern/sector-storage/sched_assigner_util.go rename to extern/sector-storage/sched_assigner_utilization.go index 48f19931e..87dd9bc59 100644 --- a/extern/sector-storage/sched_assigner_util.go +++ b/extern/sector-storage/sched_assigner_utilization.go @@ -13,7 +13,6 @@ func NewLowestUtilizationAssigner() Assigner { } func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int { - // Step 2 scheduled := 0 rmQueue := make([]int, 0, queueLen) workerUtil := map[storiface.WorkerID]float64{} @@ -36,7 +35,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) // TODO: allow bigger windows - if !windows[wnd].Allocated.CanHandleRequest(needRes, wid, "schedAssign", info) { + if !windows[wnd].Allocated.CanHandleRequest(res, wid, "schedAssign", info) { continue } diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 046b32d1a..312503a9c 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -223,7 +223,8 @@ func addTestWorker(t *testing.T, sched *Scheduler, index *stores.Index, name str } func TestSchedStartStop(t *testing.T) { - sched := newScheduler() + sched, err := newScheduler("") + require.NoError(t, err) go sched.runSched() addTestWorker(t, sched, stores.NewIndex(), "fred", nil, decentWorkerResources, false) @@ -352,7 +353,8 @@ func TestSched(t *testing.T) { return func(t *testing.T) { index := stores.NewIndex() - sched := newScheduler() + sched, err := newScheduler("") + require.NoError(t, err) sched.testSync = make(chan struct{}) go sched.runSched() @@ -604,7 +606,8 @@ func BenchmarkTrySched(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - sched := newScheduler() + sched, err := newScheduler("") + require.NoError(b, err) sched.Workers[storiface.WorkerID{}] = &WorkerHandle{ workerRpc: nil, Info: storiface.WorkerInfo{ diff --git a/node/config/def.go b/node/config/def.go index 04c512082..6be3b61d8 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -159,6 +159,8 @@ func DefaultStorageMiner() *StorageMiner { // it's the ratio between 10gbit / 1gbit ParallelFetchLimit: 10, + Assigner: "utilization", + // By default use the hardware resource filtering strategy. ResourceFiltering: sectorstorage.ResourceFilteringHardware, }, diff --git a/node/config/types.go b/node/config/types.go index 1505ecb39..0e5bcbd5f 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -330,6 +330,11 @@ type SealerConfig struct { AllowProveReplicaUpdate2 bool AllowRegenSectorKey bool + // Assigner specifies the worker assigner to use when scheduling tasks. + // "utilization" (default) - assign tasks to workers with lowest utilization. + // "spread" - assign tasks to as many distinct workers as possible. + Assigner string + // ResourceFiltering instructs the system which resource filtering strategy // to use when evaluating tasks against this worker. An empty value defaults // to "hardware". From 443488b096fbb05ac9257147274d445da8ed881b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 23 May 2022 17:32:54 +0200 Subject: [PATCH 4/9] lint, docsgen --- documentation/en/default-lotus-miner-config.toml | 8 ++++++++ extern/sector-storage/sched_assigner_spread.go | 2 +- node/config/doc_gen.go | 8 ++++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index ef3b23b5c..ab9924c02 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -534,6 +534,14 @@ # env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY #AllowRegenSectorKey = true + # Assigner specifies the worker assigner to use when scheduling tasks. + # "utilization" (default) - assign tasks to workers with lowest utilization. + # "spread" - assign tasks to as many distinct workers as possible. + # + # type: string + # env var: LOTUS_STORAGE_ASSIGNER + #Assigner = "utilization" + # ResourceFiltering instructs the system which resource filtering strategy # to use when evaluating tasks against this worker. An empty value defaults # to "hardware". diff --git a/extern/sector-storage/sched_assigner_spread.go b/extern/sector-storage/sched_assigner_spread.go index 2623c9c06..72a389993 100644 --- a/extern/sector-storage/sched_assigner_spread.go +++ b/extern/sector-storage/sched_assigner_spread.go @@ -60,7 +60,7 @@ func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows [] "worker", bestWid, "assigned", bestAssigned) - workerAssigned[bestWid] += 1 + workerAssigned[bestWid]++ windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) rmQueue = append(rmQueue, sqi) diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 757d80932..a9c7ed8f2 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -756,6 +756,14 @@ This parameter is ONLY applicable if the retrieval pricing policy strategy has b Comment: ``, }, + { + Name: "Assigner", + Type: "string", + + Comment: `Assigner specifies the worker assigner to use when scheduling tasks. +"utilization" (default) - assign tasks to workers with lowest utilization. +"spread" - assign tasks to as many distinct workers as possible.`, + }, { Name: "ResourceFiltering", Type: "sectorstorage.ResourceFilteringStrategy", From 16f434c79094066c176e657ef4ceb846de6fcca9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 23 May 2022 17:38:13 +0200 Subject: [PATCH 5/9] config: Plumb Assigner config correctly --- node/config/storage.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/node/config/storage.go b/node/config/storage.go index bf997add2..6ab35984a 100644 --- a/node/config/storage.go +++ b/node/config/storage.go @@ -64,6 +64,8 @@ func (c *StorageMiner) StorageManager() sectorstorage.Config { AllowRegenSectorKey: c.Storage.AllowRegenSectorKey, ResourceFiltering: c.Storage.ResourceFiltering, + Assigner: c.Storage.Assigner, + ParallelCheckLimit: c.Proving.ParallelCheckLimit, } } From fd8b91a5d4c4a6ff9d419ae6b6cac4c9276d99a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 23 May 2022 22:18:41 +0200 Subject: [PATCH 6/9] sched: Fix resource checks --- extern/sector-storage/sched_assigner_spread.go | 6 ++---- extern/sector-storage/sched_assigner_utilization.go | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/extern/sector-storage/sched_assigner_spread.go b/extern/sector-storage/sched_assigner_spread.go index 72a389993..f97c657f9 100644 --- a/extern/sector-storage/sched_assigner_spread.go +++ b/extern/sector-storage/sched_assigner_spread.go @@ -20,7 +20,6 @@ func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows [] task := (*sh.SchedQueue)[sqi] selectedWindow := -1 - var info storiface.WorkerInfo var bestWid storiface.WorkerID bestAssigned := math.MaxInt // smaller = better @@ -28,11 +27,11 @@ func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows [] wid := sh.OpenWindows[wnd].Worker w := sh.Workers[wid] - res := info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) + res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) - if !windows[wnd].Allocated.CanHandleRequest(res, wid, "schedAssign", info) { + if !windows[wnd].Allocated.CanHandleRequest(res, wid, "schedAssign", w.Info) { continue } @@ -41,7 +40,6 @@ func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows [] continue } - info = w.Info bestWid = wid selectedWindow = wnd bestAssigned = wu diff --git a/extern/sector-storage/sched_assigner_utilization.go b/extern/sector-storage/sched_assigner_utilization.go index 87dd9bc59..29f4206a5 100644 --- a/extern/sector-storage/sched_assigner_utilization.go +++ b/extern/sector-storage/sched_assigner_utilization.go @@ -30,12 +30,12 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, wid := sh.OpenWindows[wnd].Worker w := sh.Workers[wid] - res := info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) + res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) // TODO: allow bigger windows - if !windows[wnd].Allocated.CanHandleRequest(res, wid, "schedAssign", info) { + if !windows[wnd].Allocated.CanHandleRequest(res, wid, "schedAssign", w.Info) { continue } From f2c29931643045fb226daf0ebcca177f4c0f7aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 23 May 2022 22:22:27 +0200 Subject: [PATCH 7/9] sched: Correct allocated resource accounting in spread assigner --- extern/sector-storage/sched_assigner_spread.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/extern/sector-storage/sched_assigner_spread.go b/extern/sector-storage/sched_assigner_spread.go index f97c657f9..c9e5a1066 100644 --- a/extern/sector-storage/sched_assigner_spread.go +++ b/extern/sector-storage/sched_assigner_spread.go @@ -16,10 +16,13 @@ func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows [] scheduled := 0 rmQueue := make([]int, 0, queueLen) workerAssigned := map[storiface.WorkerID]int{} + for sqi := 0; sqi < queueLen; sqi++ { task := (*sh.SchedQueue)[sqi] selectedWindow := -1 + var needRes storiface.Resources + var info storiface.WorkerInfo var bestWid storiface.WorkerID bestAssigned := math.MaxInt // smaller = better @@ -40,6 +43,8 @@ func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows [] continue } + info = w.Info + needRes = res bestWid = wid selectedWindow = wnd bestAssigned = wu @@ -59,6 +64,7 @@ func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows [] "assigned", bestAssigned) workerAssigned[bestWid]++ + windows[selectedWindow].Allocated.Add(info.Resources, needRes) windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) rmQueue = append(rmQueue, sqi) From 58574554c1fda6ac00eb1b48074f4af7b30dd731 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 23 May 2022 22:31:06 +0200 Subject: [PATCH 8/9] itests: Test spread sched assigner --- itests/kit/ensemble.go | 2 ++ itests/kit/node_opts.go | 8 ++++++++ itests/worker_test.go | 16 ++++++++++++++++ 3 files changed, 26 insertions(+) diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index 8b0128f46..990baf920 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -570,6 +570,7 @@ func (n *Ensemble) Start() *Ensemble { } noLocal := m.options.minerNoLocalSealing + assigner := m.options.minerAssigner var mineBlock = make(chan lotusminer.MineReq) opts := []node.Option{ @@ -595,6 +596,7 @@ func (n *Ensemble) Start() *Ensemble { scfg.Storage.AllowCommit = false } + scfg.Storage.Assigner = assigner scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled return scfg.StorageManager() }), diff --git a/itests/kit/node_opts.go b/itests/kit/node_opts.go index 3fbacabcb..936308608 100644 --- a/itests/kit/node_opts.go +++ b/itests/kit/node_opts.go @@ -41,6 +41,7 @@ type nodeOpts struct { sectorSize abi.SectorSize maxStagingDealsBytes int64 minerNoLocalSealing bool // use worker + minerAssigner string workerTasks []sealtasks.TaskType workerStorageOpt func(stores.Store) stores.Store @@ -97,6 +98,13 @@ func WithNoLocalSealing(nope bool) NodeOpt { } } +func WithAssigner(a string) NodeOpt { + return func(opts *nodeOpts) error { + opts.minerAssigner = a + return nil + } +} + func DisableLibp2p() NodeOpt { return func(opts *nodeOpts) error { opts.disableLibp2p = true diff --git a/itests/worker_test.go b/itests/worker_test.go index c1fba2600..250acde84 100644 --- a/itests/worker_test.go +++ b/itests/worker_test.go @@ -41,6 +41,22 @@ func TestWorkerPledge(t *testing.T) { miner.PledgeSectors(ctx, 1, 0, nil) } +func TestWorkerPledgeSpread(t *testing.T) { + ctx := context.Background() + _, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), + kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal}), + kit.WithAssigner("spread"), + ) // no mock proofs + + ens.InterconnectAll().BeginMining(50 * time.Millisecond) + + e, err := worker.Enabled(ctx) + require.NoError(t, err) + require.True(t, e) + + miner.PledgeSectors(ctx, 4, 0, nil) +} + func TestWorkerDataCid(t *testing.T) { ctx := context.Background() _, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true), From 3de34ea3c0ff473b469ef31ea5bd32f68fdc21de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 23 May 2022 23:28:31 +0200 Subject: [PATCH 9/9] Fix TestWorkerPledgeSpread in CI --- itests/worker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/itests/worker_test.go b/itests/worker_test.go index 250acde84..fd8798448 100644 --- a/itests/worker_test.go +++ b/itests/worker_test.go @@ -54,7 +54,7 @@ func TestWorkerPledgeSpread(t *testing.T) { require.NoError(t, err) require.True(t, e) - miner.PledgeSectors(ctx, 4, 0, nil) + miner.PledgeSectors(ctx, 1, 0, nil) } func TestWorkerDataCid(t *testing.T) {