diff --git a/.circleci/config.yml b/.circleci/config.yml index 9038fdb8f..5fcb83145 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -882,6 +882,12 @@ workflows: - build suite: itest-sdr_upgrade target: "./itests/sdr_upgrade_test.go" + - test: + name: test-itest-sealing_resources + requires: + - build + suite: itest-sealing_resources + target: "./itests/sealing_resources_test.go" - test: name: test-itest-sector_finalize_early requires: diff --git a/itests/sealing_resources_test.go b/itests/sealing_resources_test.go new file mode 100644 index 000000000..85779fd88 --- /dev/null +++ b/itests/sealing_resources_test.go @@ -0,0 +1,64 @@ +package itests + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/lotus/itests/kit" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +// Regression check for a fix introduced in https://github.com/filecoin-project/lotus/pull/10633 +func TestPledgeMaxConcurrentGet(t *testing.T) { + require.NoError(t, os.Setenv("GET_2K_MAX_CONCURRENT", "1")) + t.Cleanup(func() { + require.NoError(t, os.Unsetenv("GET_2K_MAX_CONCURRENT")) + }) + + kit.QuietMiningLogs() + + blockTime := 50 * time.Millisecond + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, miner, ens := kit.EnsembleMinimal(t, kit.NoStorage()) // no mock proofs + ens.InterconnectAll().BeginMiningMustPost(blockTime) + + // separate sealed and storage paths so that finalize move needs to happen + miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) { + meta.CanSeal = true + }) + miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) { + meta.CanStore = true + }) + + // NOTE: This test only repros the issue when Fetch tasks take ~10s, there's + // no great way to do that in a non-horribly-hacky way + + /* The horribly hacky way: + + diff --git a/storage/sealer/sched_worker.go b/storage/sealer/sched_worker.go + index 35acd755d..76faec859 100644 + --- a/storage/sealer/sched_worker.go + +++ b/storage/sealer/sched_worker.go + @@ -513,6 +513,10 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error { + tw.start() + err = <-werr + + + if req.TaskType == sealtasks.TTFetch { + + time.Sleep(10 * time.Second) + + } + + + select { + case req.ret <- workerResponse{err: err}: + case <-req.Ctx.Done(): + + */ + + miner.PledgeSectors(ctx, 3, 0, nil) +} diff --git a/storage/sealer/sched_assigner_common.go b/storage/sealer/sched_assigner_common.go index d676d410d..ffc21b0dd 100644 --- a/storage/sealer/sched_assigner_common.go +++ b/storage/sealer/sched_assigner_common.go @@ -103,7 +103,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) { needRes := worker.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) // TODO: allow bigger windows - if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), needRes, windowRequest.Worker, "schedAcceptable", worker.Info) { + if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), needRes, windowRequest.Worker, "schedAcceptable", worker.Info) { continue } diff --git a/storage/sealer/sched_assigner_darts.go b/storage/sealer/sched_assigner_darts.go index e28b70e78..134698fbf 100644 --- a/storage/sealer/sched_assigner_darts.go +++ b/storage/sealer/sched_assigner_darts.go @@ -37,7 +37,7 @@ func RandomWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows [] log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) - if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) { + if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) { continue } @@ -71,7 +71,7 @@ func RandomWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows [] "worker", bestWid, "choices", len(choices)) - windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes) + windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes) windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) rmQueue = append(rmQueue, sqi) diff --git a/storage/sealer/sched_assigner_spread.go b/storage/sealer/sched_assigner_spread.go index 0a62b7406..b1ac4c8e9 100644 --- a/storage/sealer/sched_assigner_spread.go +++ b/storage/sealer/sched_assigner_spread.go @@ -35,7 +35,7 @@ func SpreadWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [ log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) - if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) { + if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) { continue } @@ -71,7 +71,7 @@ func SpreadWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [ "assigned", bestAssigned) workerAssigned[bestWid]++ - windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes) + windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes) windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) rmQueue = append(rmQueue, sqi) diff --git a/storage/sealer/sched_assigner_spread_tasks.go b/storage/sealer/sched_assigner_spread_tasks.go index 09cf98046..f98e7b745 100644 --- a/storage/sealer/sched_assigner_spread_tasks.go +++ b/storage/sealer/sched_assigner_spread_tasks.go @@ -41,7 +41,7 @@ func SpreadTasksWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWind log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) - if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) { + if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) { continue } @@ -80,7 +80,7 @@ func SpreadTasksWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWind "assigned", bestAssigned) workerAssigned[bestWid]++ - windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes) + windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes) windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) rmQueue = append(rmQueue, sqi) diff --git a/storage/sealer/sched_assigner_utilization.go b/storage/sealer/sched_assigner_utilization.go index 1e75d904a..c81c9f187 100644 --- a/storage/sealer/sched_assigner_utilization.go +++ b/storage/sealer/sched_assigner_utilization.go @@ -35,7 +35,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) // TODO: allow bigger windows - if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) { + if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) { continue } @@ -82,7 +82,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, "worker", bestWid, "utilization", bestUtilization) - workerUtil[bestWid] += windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes) + workerUtil[bestWid] += windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes) windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) rmQueue = append(rmQueue, sqi) diff --git a/storage/sealer/sched_post.go b/storage/sealer/sched_post.go index 0e0c39768..c6bd81829 100644 --- a/storage/sealer/sched_post.go +++ b/storage/sealer/sched_post.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/hashicorp/go-multierror" "golang.org/x/xerrors" @@ -110,7 +111,7 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg for i, selected := range candidates { worker := ps.workers[selected.id] - err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error { + err := worker.active.withResources(uuid.UUID{}, selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error { ps.lk.Unlock() defer ps.lk.Lock() @@ -148,7 +149,7 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand continue } - if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) { + if !wr.active.CanHandleRequest(uuid.UUID{}, ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) { continue } diff --git a/storage/sealer/sched_resources.go b/storage/sealer/sched_resources.go index 597f36dbe..a423def9f 100644 --- a/storage/sealer/sched_resources.go +++ b/storage/sealer/sched_resources.go @@ -3,6 +3,8 @@ package sealer import ( "sync" + "github.com/google/uuid" + "github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -20,7 +22,7 @@ type ActiveResources struct { } type taskCounter struct { - taskCounters map[sealtasks.SealTaskType]int + taskCounters map[sealtasks.SealTaskType]map[uuid.UUID]int // this lock is technically redundant, as ActiveResources is always accessed // with the worker lock, but let's not panic if we ever change that @@ -29,34 +31,48 @@ type taskCounter struct { func newTaskCounter() *taskCounter { return &taskCounter{ - taskCounters: map[sealtasks.SealTaskType]int{}, + taskCounters: make(map[sealtasks.SealTaskType]map[uuid.UUID]int), } } -func (tc *taskCounter) Add(tt sealtasks.SealTaskType) { +func (tc *taskCounter) Add(tt sealtasks.SealTaskType, schedID uuid.UUID) { tc.lk.Lock() defer tc.lk.Unlock() - tc.taskCounters[tt]++ + tc.getUnlocked(tt)[schedID]++ } -func (tc *taskCounter) Free(tt sealtasks.SealTaskType) { +func (tc *taskCounter) Free(tt sealtasks.SealTaskType, schedID uuid.UUID) { tc.lk.Lock() defer tc.lk.Unlock() - tc.taskCounters[tt]-- + m := tc.getUnlocked(tt) + if m[schedID] <= 1 { + delete(m, schedID) + } else { + m[schedID]-- + } } -func (tc *taskCounter) Get(tt sealtasks.SealTaskType) int { - tc.lk.Lock() - defer tc.lk.Unlock() +func (tc *taskCounter) getUnlocked(tt sealtasks.SealTaskType) map[uuid.UUID]int { + if tc.taskCounters[tt] == nil { + tc.taskCounters[tt] = make(map[uuid.UUID]int) + } + return tc.taskCounters[tt] } +func (tc *taskCounter) Get(tt sealtasks.SealTaskType) map[uuid.UUID]int { + tc.lk.Lock() + defer tc.lk.Unlock() + + return tc.getUnlocked(tt) +} + func (tc *taskCounter) Sum() int { tc.lk.Lock() defer tc.lk.Unlock() sum := 0 for _, v := range tc.taskCounters { - sum += v + sum += len(v) } return sum } @@ -64,8 +80,8 @@ func (tc *taskCounter) Sum() int { func (tc *taskCounter) ForEach(cb func(tt sealtasks.SealTaskType, count int)) { tc.lk.Lock() defer tc.lk.Unlock() - for tt, count := range tc.taskCounters { - cb(tt, count) + for tt, v := range tc.taskCounters { + cb(tt, len(v)) } } @@ -75,8 +91,8 @@ func NewActiveResources(tc *taskCounter) *ActiveResources { } } -func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error { - for !a.CanHandleRequest(tt, r, id, "withResources", wr) { +func (a *ActiveResources) withResources(schedID uuid.UUID, id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error { + for !a.CanHandleRequest(schedID, tt, r, id, "withResources", wr) { if a.cond == nil { a.cond = sync.NewCond(locker) } @@ -85,11 +101,11 @@ func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.Work a.waiting-- } - a.Add(tt, wr.Resources, r) + a.Add(schedID, tt, wr.Resources, r) err := cb() - a.Free(tt, wr.Resources, r) + a.Free(schedID, tt, wr.Resources, r) return err } @@ -100,7 +116,7 @@ func (a *ActiveResources) hasWorkWaiting() bool { } // add task resources to ActiveResources and return utilization difference -func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 { +func (a *ActiveResources) Add(schedID uuid.UUID, tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 { startUtil := a.utilization(wr) if r.GPUUtilization > 0 { @@ -109,19 +125,19 @@ func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerReso a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs)) a.memUsedMin += r.MinMemory a.memUsedMax += r.MaxMemory - a.taskCounters.Add(tt) + a.taskCounters.Add(tt, schedID) return a.utilization(wr) - startUtil } -func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) { +func (a *ActiveResources) Free(schedID uuid.UUID, tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) { if r.GPUUtilization > 0 { a.gpuUsed -= r.GPUUtilization } a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs)) a.memUsedMin -= r.MinMemory a.memUsedMax -= r.MaxMemory - a.taskCounters.Free(tt) + a.taskCounters.Free(tt, schedID) if a.cond != nil { a.cond.Broadcast() @@ -130,9 +146,10 @@ func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerRes // CanHandleRequest evaluates if the worker has enough available resources to // handle the request. -func (a *ActiveResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool { +func (a *ActiveResources) CanHandleRequest(schedID uuid.UUID, tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool { if needRes.MaxConcurrent > 0 { - if a.taskCounters.Get(tt) >= needRes.MaxConcurrent { + tasks := a.taskCounters.Get(tt) + if len(tasks) >= needRes.MaxConcurrent && (schedID == uuid.UUID{} || tasks[schedID] == 0) { log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters.Get(tt)) return false } @@ -226,7 +243,7 @@ func (a *ActiveResources) taskCount(tt *sealtasks.SealTaskType) int { return a.taskCounters.Sum() } - return a.taskCounters.Get(*tt) + return len(a.taskCounters.Get(*tt)) } func (wh *WorkerHandle) Utilization() float64 { diff --git a/storage/sealer/sched_test.go b/storage/sealer/sched_test.go index 07731e934..2e2b05ab2 100644 --- a/storage/sealer/sched_test.go +++ b/storage/sealer/sched_test.go @@ -698,7 +698,7 @@ func TestWindowCompact(t *testing.T) { TaskType: task, Sector: storiface.SectorRef{ProofType: spt}, }) - window.Allocated.Add(task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt]) + window.Allocated.Add(uuid.UUID{}, task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt]) } wh.activeWindows = append(wh.activeWindows, window) @@ -717,7 +717,7 @@ func TestWindowCompact(t *testing.T) { for ti, task := range tasks { require.Equal(t, task, wh.activeWindows[wi].Todo[ti].TaskType, "%d, %d", wi, ti) - expectRes.Add(task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt]) + expectRes.Add(uuid.UUID{}, task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt]) } require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].Allocated.cpuUse, "%d", wi) diff --git a/storage/sealer/sched_worker.go b/storage/sealer/sched_worker.go index b6efc851a..35acd755d 100644 --- a/storage/sealer/sched_worker.go +++ b/storage/sealer/sched_worker.go @@ -294,14 +294,14 @@ func (sw *schedWorker) workerCompactWindows() { for ti, todo := range window.Todo { needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType) - if !lower.Allocated.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "compactWindows", worker.Info) { + if !lower.Allocated.CanHandleRequest(todo.SchedId, todo.SealTask(), needRes, sw.wid, "compactWindows", worker.Info) { continue } moved = append(moved, ti) lower.Todo = append(lower.Todo, todo) - lower.Allocated.Add(todo.SealTask(), worker.Info.Resources, needRes) - window.Allocated.Free(todo.SealTask(), worker.Info.Resources, needRes) + lower.Allocated.Add(todo.SchedId, todo.SealTask(), worker.Info.Resources, needRes) + window.Allocated.Free(todo.SchedId, todo.SealTask(), worker.Info.Resources, needRes) } if len(moved) > 0 { @@ -355,7 +355,7 @@ assignLoop: worker.lk.Lock() for t, todo := range firstWindow.Todo { needResPrep := worker.Info.Resources.PrepResourceSpec(todo.Sector.ProofType, todo.TaskType, todo.prepare.PrepType) - if worker.preparing.CanHandleRequest(todo.PrepSealTask(), needResPrep, sw.wid, "startPreparing", worker.Info) { + if worker.preparing.CanHandleRequest(todo.SchedId, todo.PrepSealTask(), needResPrep, sw.wid, "startPreparing", worker.Info) { tidx = t break } @@ -416,7 +416,7 @@ assignLoop: } needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType) - if worker.active.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) { + if worker.active.CanHandleRequest(todo.SchedId, todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) { tidx = t break } @@ -457,7 +457,7 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error { needResPrep := w.Info.Resources.PrepResourceSpec(req.Sector.ProofType, req.TaskType, req.prepare.PrepType) w.lk.Lock() - w.preparing.Add(req.PrepSealTask(), w.Info.Resources, needResPrep) + w.preparing.Add(req.SchedId, req.PrepSealTask(), w.Info.Resources, needResPrep) w.lk.Unlock() go func() { @@ -468,7 +468,7 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error { w.lk.Lock() if err != nil { - w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep) + w.preparing.Free(req.SchedId, req.PrepSealTask(), w.Info.Resources, needResPrep) w.lk.Unlock() select { @@ -497,11 +497,12 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error { }() // wait (if needed) for resources in the 'active' window - err = w.active.withResources(sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error { - w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep) + err = w.active.withResources(req.SchedId, sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error { + w.preparing.Free(req.SchedId, req.PrepSealTask(), w.Info.Resources, needResPrep) w.lk.Unlock() defer w.lk.Lock() // we MUST return locked from this function + // make sure the worker loop sees that the prepare task has finished select { case sw.taskDone <- struct{}{}: case <-sh.closing: @@ -525,6 +526,12 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error { w.lk.Unlock() + // make sure the worker loop sees that the task has finished + select { + case sw.taskDone <- struct{}{}: + default: // there is a notification pending already + } + // This error should always be nil, since nothing is setting it, but just to be safe: if err != nil { log.Errorf("error executing worker (withResources): %+v", err) @@ -539,7 +546,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *WorkerRequest) error { needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType) - w.active.Add(req.SealTask(), w.Info.Resources, needRes) + w.active.Add(req.SchedId, req.SealTask(), w.Info.Resources, needRes) go func() { // Do the work! @@ -557,7 +564,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *WorkerRequest) error { w.lk.Lock() - w.active.Free(req.SealTask(), w.Info.Resources, needRes) + w.active.Free(req.SchedId, req.SealTask(), w.Info.Resources, needRes) select { case sw.taskDone <- struct{}{}: