fix: sched: Address GET_32G_MAX_CONCURRENT regression (#10850)

* Fix 1.21 regression: GET_32G_MAX_CONCURRENT + mixed prepared/executing leads to stuck scheduler

If you have 12 GET tasks and GET_32G_MAX_CONCURRENT=1, sealing jobs will only show assigned tasks for GET of the miner
and is stuck.
I believe this to be a regression of 1.21 unifying the counters, in the case of GETs where PrepType and TaskType
both being seal/v0/fetch leading to a state where tasks are blocked since already counted towards the limit.

* itests: Repro issue from PR #10633

* make counters int (non-working)

* fix: worker sched: Send taskDone notifs after tasks are done

* itests: Make TestPledgeMaxConcurrentGet actually reproduce the issue

* make the linter happy

---------

Co-authored-by: Steffen Butzer <steffen.butzer@outlook.com>
This commit is contained in:
Łukasz Magiera 2023-05-10 21:43:42 +02:00 committed by GitHub
parent 298b2b4785
commit 6fd93ed170
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 142 additions and 47 deletions

View File

@ -882,6 +882,12 @@ workflows:
- build - build
suite: itest-sdr_upgrade suite: itest-sdr_upgrade
target: "./itests/sdr_upgrade_test.go" target: "./itests/sdr_upgrade_test.go"
- test:
name: test-itest-sealing_resources
requires:
- build
suite: itest-sealing_resources
target: "./itests/sealing_resources_test.go"
- test: - test:
name: test-itest-sector_finalize_early name: test-itest-sector_finalize_early
requires: requires:

View File

@ -0,0 +1,64 @@
package itests
import (
"context"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
// Regression check for a fix introduced in https://github.com/filecoin-project/lotus/pull/10633
func TestPledgeMaxConcurrentGet(t *testing.T) {
require.NoError(t, os.Setenv("GET_2K_MAX_CONCURRENT", "1"))
t.Cleanup(func() {
require.NoError(t, os.Unsetenv("GET_2K_MAX_CONCURRENT"))
})
kit.QuietMiningLogs()
blockTime := 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, miner, ens := kit.EnsembleMinimal(t, kit.NoStorage()) // no mock proofs
ens.InterconnectAll().BeginMiningMustPost(blockTime)
// separate sealed and storage paths so that finalize move needs to happen
miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanSeal = true
})
miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanStore = true
})
// NOTE: This test only repros the issue when Fetch tasks take ~10s, there's
// no great way to do that in a non-horribly-hacky way
/* The horribly hacky way:
diff --git a/storage/sealer/sched_worker.go b/storage/sealer/sched_worker.go
index 35acd755d..76faec859 100644
--- a/storage/sealer/sched_worker.go
+++ b/storage/sealer/sched_worker.go
@@ -513,6 +513,10 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
tw.start()
err = <-werr
+ if req.TaskType == sealtasks.TTFetch {
+ time.Sleep(10 * time.Second)
+ }
+
select {
case req.ret <- workerResponse{err: err}:
case <-req.Ctx.Done():
*/
miner.PledgeSectors(ctx, 3, 0, nil)
}

View File

@ -103,7 +103,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
needRes := worker.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) needRes := worker.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)
// TODO: allow bigger windows // TODO: allow bigger windows
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), needRes, windowRequest.Worker, "schedAcceptable", worker.Info) { if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), needRes, windowRequest.Worker, "schedAcceptable", worker.Info) {
continue continue
} }

View File

@ -37,7 +37,7 @@ func RandomWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) { if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) {
continue continue
} }
@ -71,7 +71,7 @@ func RandomWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []
"worker", bestWid, "worker", bestWid,
"choices", len(choices)) "choices", len(choices))
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes) windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi) rmQueue = append(rmQueue, sqi)

View File

@ -35,7 +35,7 @@ func SpreadWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) { if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) {
continue continue
} }
@ -71,7 +71,7 @@ func SpreadWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [
"assigned", bestAssigned) "assigned", bestAssigned)
workerAssigned[bestWid]++ workerAssigned[bestWid]++
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes) windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi) rmQueue = append(rmQueue, sqi)

View File

@ -41,7 +41,7 @@ func SpreadTasksWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWind
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) { if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) {
continue continue
} }
@ -80,7 +80,7 @@ func SpreadTasksWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWind
"assigned", bestAssigned) "assigned", bestAssigned)
workerAssigned[bestWid]++ workerAssigned[bestWid]++
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes) windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi) rmQueue = append(rmQueue, sqi)

View File

@ -35,7 +35,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int,
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
// TODO: allow bigger windows // TODO: allow bigger windows
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) { if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) {
continue continue
} }
@ -82,7 +82,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int,
"worker", bestWid, "worker", bestWid,
"utilization", bestUtilization) "utilization", bestUtilization)
workerUtil[bestWid] += windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes) workerUtil[bestWid] += windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi) rmQueue = append(rmQueue, sqi)

View File

@ -7,6 +7,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -110,7 +111,7 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg
for i, selected := range candidates { for i, selected := range candidates {
worker := ps.workers[selected.id] worker := ps.workers[selected.id]
err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error { err := worker.active.withResources(uuid.UUID{}, selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock() ps.lk.Unlock()
defer ps.lk.Lock() defer ps.lk.Lock()
@ -148,7 +149,7 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand
continue continue
} }
if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) { if !wr.active.CanHandleRequest(uuid.UUID{}, ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) {
continue continue
} }

View File

@ -3,6 +3,8 @@ package sealer
import ( import (
"sync" "sync"
"github.com/google/uuid"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
) )
@ -20,7 +22,7 @@ type ActiveResources struct {
} }
type taskCounter struct { type taskCounter struct {
taskCounters map[sealtasks.SealTaskType]int taskCounters map[sealtasks.SealTaskType]map[uuid.UUID]int
// this lock is technically redundant, as ActiveResources is always accessed // this lock is technically redundant, as ActiveResources is always accessed
// with the worker lock, but let's not panic if we ever change that // with the worker lock, but let's not panic if we ever change that
@ -29,34 +31,48 @@ type taskCounter struct {
func newTaskCounter() *taskCounter { func newTaskCounter() *taskCounter {
return &taskCounter{ return &taskCounter{
taskCounters: map[sealtasks.SealTaskType]int{}, taskCounters: make(map[sealtasks.SealTaskType]map[uuid.UUID]int),
} }
} }
func (tc *taskCounter) Add(tt sealtasks.SealTaskType) { func (tc *taskCounter) Add(tt sealtasks.SealTaskType, schedID uuid.UUID) {
tc.lk.Lock() tc.lk.Lock()
defer tc.lk.Unlock() defer tc.lk.Unlock()
tc.taskCounters[tt]++ tc.getUnlocked(tt)[schedID]++
} }
func (tc *taskCounter) Free(tt sealtasks.SealTaskType) { func (tc *taskCounter) Free(tt sealtasks.SealTaskType, schedID uuid.UUID) {
tc.lk.Lock() tc.lk.Lock()
defer tc.lk.Unlock() defer tc.lk.Unlock()
tc.taskCounters[tt]-- m := tc.getUnlocked(tt)
if m[schedID] <= 1 {
delete(m, schedID)
} else {
m[schedID]--
}
} }
func (tc *taskCounter) Get(tt sealtasks.SealTaskType) int { func (tc *taskCounter) getUnlocked(tt sealtasks.SealTaskType) map[uuid.UUID]int {
tc.lk.Lock() if tc.taskCounters[tt] == nil {
defer tc.lk.Unlock() tc.taskCounters[tt] = make(map[uuid.UUID]int)
}
return tc.taskCounters[tt] return tc.taskCounters[tt]
} }
func (tc *taskCounter) Get(tt sealtasks.SealTaskType) map[uuid.UUID]int {
tc.lk.Lock()
defer tc.lk.Unlock()
return tc.getUnlocked(tt)
}
func (tc *taskCounter) Sum() int { func (tc *taskCounter) Sum() int {
tc.lk.Lock() tc.lk.Lock()
defer tc.lk.Unlock() defer tc.lk.Unlock()
sum := 0 sum := 0
for _, v := range tc.taskCounters { for _, v := range tc.taskCounters {
sum += v sum += len(v)
} }
return sum return sum
} }
@ -64,8 +80,8 @@ func (tc *taskCounter) Sum() int {
func (tc *taskCounter) ForEach(cb func(tt sealtasks.SealTaskType, count int)) { func (tc *taskCounter) ForEach(cb func(tt sealtasks.SealTaskType, count int)) {
tc.lk.Lock() tc.lk.Lock()
defer tc.lk.Unlock() defer tc.lk.Unlock()
for tt, count := range tc.taskCounters { for tt, v := range tc.taskCounters {
cb(tt, count) cb(tt, len(v))
} }
} }
@ -75,8 +91,8 @@ func NewActiveResources(tc *taskCounter) *ActiveResources {
} }
} }
func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error { func (a *ActiveResources) withResources(schedID uuid.UUID, id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.CanHandleRequest(tt, r, id, "withResources", wr) { for !a.CanHandleRequest(schedID, tt, r, id, "withResources", wr) {
if a.cond == nil { if a.cond == nil {
a.cond = sync.NewCond(locker) a.cond = sync.NewCond(locker)
} }
@ -85,11 +101,11 @@ func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.Work
a.waiting-- a.waiting--
} }
a.Add(tt, wr.Resources, r) a.Add(schedID, tt, wr.Resources, r)
err := cb() err := cb()
a.Free(tt, wr.Resources, r) a.Free(schedID, tt, wr.Resources, r)
return err return err
} }
@ -100,7 +116,7 @@ func (a *ActiveResources) hasWorkWaiting() bool {
} }
// add task resources to ActiveResources and return utilization difference // add task resources to ActiveResources and return utilization difference
func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 { func (a *ActiveResources) Add(schedID uuid.UUID, tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 {
startUtil := a.utilization(wr) startUtil := a.utilization(wr)
if r.GPUUtilization > 0 { if r.GPUUtilization > 0 {
@ -109,19 +125,19 @@ func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerReso
a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs)) a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin += r.MinMemory a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory a.memUsedMax += r.MaxMemory
a.taskCounters.Add(tt) a.taskCounters.Add(tt, schedID)
return a.utilization(wr) - startUtil return a.utilization(wr) - startUtil
} }
func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) { func (a *ActiveResources) Free(schedID uuid.UUID, tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) {
if r.GPUUtilization > 0 { if r.GPUUtilization > 0 {
a.gpuUsed -= r.GPUUtilization a.gpuUsed -= r.GPUUtilization
} }
a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs)) a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin -= r.MinMemory a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory a.memUsedMax -= r.MaxMemory
a.taskCounters.Free(tt) a.taskCounters.Free(tt, schedID)
if a.cond != nil { if a.cond != nil {
a.cond.Broadcast() a.cond.Broadcast()
@ -130,9 +146,10 @@ func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerRes
// CanHandleRequest evaluates if the worker has enough available resources to // CanHandleRequest evaluates if the worker has enough available resources to
// handle the request. // handle the request.
func (a *ActiveResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool { func (a *ActiveResources) CanHandleRequest(schedID uuid.UUID, tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
if needRes.MaxConcurrent > 0 { if needRes.MaxConcurrent > 0 {
if a.taskCounters.Get(tt) >= needRes.MaxConcurrent { tasks := a.taskCounters.Get(tt)
if len(tasks) >= needRes.MaxConcurrent && (schedID == uuid.UUID{} || tasks[schedID] == 0) {
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters.Get(tt)) log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters.Get(tt))
return false return false
} }
@ -226,7 +243,7 @@ func (a *ActiveResources) taskCount(tt *sealtasks.SealTaskType) int {
return a.taskCounters.Sum() return a.taskCounters.Sum()
} }
return a.taskCounters.Get(*tt) return len(a.taskCounters.Get(*tt))
} }
func (wh *WorkerHandle) Utilization() float64 { func (wh *WorkerHandle) Utilization() float64 {

View File

@ -698,7 +698,7 @@ func TestWindowCompact(t *testing.T) {
TaskType: task, TaskType: task,
Sector: storiface.SectorRef{ProofType: spt}, Sector: storiface.SectorRef{ProofType: spt},
}) })
window.Allocated.Add(task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt]) window.Allocated.Add(uuid.UUID{}, task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
} }
wh.activeWindows = append(wh.activeWindows, window) wh.activeWindows = append(wh.activeWindows, window)
@ -717,7 +717,7 @@ func TestWindowCompact(t *testing.T) {
for ti, task := range tasks { for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].Todo[ti].TaskType, "%d, %d", wi, ti) require.Equal(t, task, wh.activeWindows[wi].Todo[ti].TaskType, "%d, %d", wi, ti)
expectRes.Add(task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt]) expectRes.Add(uuid.UUID{}, task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
} }
require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].Allocated.cpuUse, "%d", wi) require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].Allocated.cpuUse, "%d", wi)

View File

@ -294,14 +294,14 @@ func (sw *schedWorker) workerCompactWindows() {
for ti, todo := range window.Todo { for ti, todo := range window.Todo {
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType) needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType)
if !lower.Allocated.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "compactWindows", worker.Info) { if !lower.Allocated.CanHandleRequest(todo.SchedId, todo.SealTask(), needRes, sw.wid, "compactWindows", worker.Info) {
continue continue
} }
moved = append(moved, ti) moved = append(moved, ti)
lower.Todo = append(lower.Todo, todo) lower.Todo = append(lower.Todo, todo)
lower.Allocated.Add(todo.SealTask(), worker.Info.Resources, needRes) lower.Allocated.Add(todo.SchedId, todo.SealTask(), worker.Info.Resources, needRes)
window.Allocated.Free(todo.SealTask(), worker.Info.Resources, needRes) window.Allocated.Free(todo.SchedId, todo.SealTask(), worker.Info.Resources, needRes)
} }
if len(moved) > 0 { if len(moved) > 0 {
@ -355,7 +355,7 @@ assignLoop:
worker.lk.Lock() worker.lk.Lock()
for t, todo := range firstWindow.Todo { for t, todo := range firstWindow.Todo {
needResPrep := worker.Info.Resources.PrepResourceSpec(todo.Sector.ProofType, todo.TaskType, todo.prepare.PrepType) needResPrep := worker.Info.Resources.PrepResourceSpec(todo.Sector.ProofType, todo.TaskType, todo.prepare.PrepType)
if worker.preparing.CanHandleRequest(todo.PrepSealTask(), needResPrep, sw.wid, "startPreparing", worker.Info) { if worker.preparing.CanHandleRequest(todo.SchedId, todo.PrepSealTask(), needResPrep, sw.wid, "startPreparing", worker.Info) {
tidx = t tidx = t
break break
} }
@ -416,7 +416,7 @@ assignLoop:
} }
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType) needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType)
if worker.active.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) { if worker.active.CanHandleRequest(todo.SchedId, todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) {
tidx = t tidx = t
break break
} }
@ -457,7 +457,7 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
needResPrep := w.Info.Resources.PrepResourceSpec(req.Sector.ProofType, req.TaskType, req.prepare.PrepType) needResPrep := w.Info.Resources.PrepResourceSpec(req.Sector.ProofType, req.TaskType, req.prepare.PrepType)
w.lk.Lock() w.lk.Lock()
w.preparing.Add(req.PrepSealTask(), w.Info.Resources, needResPrep) w.preparing.Add(req.SchedId, req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock() w.lk.Unlock()
go func() { go func() {
@ -468,7 +468,7 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
w.lk.Lock() w.lk.Lock()
if err != nil { if err != nil {
w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep) w.preparing.Free(req.SchedId, req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock() w.lk.Unlock()
select { select {
@ -497,11 +497,12 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
}() }()
// wait (if needed) for resources in the 'active' window // wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error { err = w.active.withResources(req.SchedId, sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error {
w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep) w.preparing.Free(req.SchedId, req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock() w.lk.Unlock()
defer w.lk.Lock() // we MUST return locked from this function defer w.lk.Lock() // we MUST return locked from this function
// make sure the worker loop sees that the prepare task has finished
select { select {
case sw.taskDone <- struct{}{}: case sw.taskDone <- struct{}{}:
case <-sh.closing: case <-sh.closing:
@ -525,6 +526,12 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
w.lk.Unlock() w.lk.Unlock()
// make sure the worker loop sees that the task has finished
select {
case sw.taskDone <- struct{}{}:
default: // there is a notification pending already
}
// This error should always be nil, since nothing is setting it, but just to be safe: // This error should always be nil, since nothing is setting it, but just to be safe:
if err != nil { if err != nil {
log.Errorf("error executing worker (withResources): %+v", err) log.Errorf("error executing worker (withResources): %+v", err)
@ -539,7 +546,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *WorkerRequest) error {
needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType) needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType)
w.active.Add(req.SealTask(), w.Info.Resources, needRes) w.active.Add(req.SchedId, req.SealTask(), w.Info.Resources, needRes)
go func() { go func() {
// Do the work! // Do the work!
@ -557,7 +564,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *WorkerRequest) error {
w.lk.Lock() w.lk.Lock()
w.active.Free(req.SealTask(), w.Info.Resources, needRes) w.active.Free(req.SchedId, req.SealTask(), w.Info.Resources, needRes)
select { select {
case sw.taskDone <- struct{}{}: case sw.taskDone <- struct{}{}: