Merge pull request #10851 from filecoin-project/sbansal/backport-1.23.1-rc2

Backport changes for 1.23.1-rc2
This commit is contained in:
Shrenuj Bansal 2023-05-10 16:04:48 -04:00 committed by GitHub
commit 86f32d1c1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 172 additions and 1113 deletions

View File

@ -63,7 +63,7 @@ commands:
name: Install Rust
command: |
curl https://sh.rustup.rs -sSf | sh -s -- -y
- run: make deps lotus
- run: make deps
download-params:
steps:
- restore_cache:
@ -304,9 +304,7 @@ jobs:
darwin: true
darwin-architecture: arm64
- run: |
export CPATH=$(brew --prefix)/include
export LIBRARY_PATH=$(brew --prefix)/lib
make lotus lotus-miner lotus-worker
export CPATH=$(brew --prefix)/include && export LIBRARY_PATH=$(brew --prefix)/lib && make lotus lotus-miner lotus-worker
- run: otool -hv lotus
- run:
name: check tag and version output match
@ -884,6 +882,12 @@ workflows:
- build
suite: itest-sdr_upgrade
target: "./itests/sdr_upgrade_test.go"
- test:
name: test-itest-sealing_resources
requires:
- build
suite: itest-sealing_resources
target: "./itests/sealing_resources_test.go"
- test:
name: test-itest-sector_finalize_early
requires:
@ -1069,6 +1073,7 @@ workflows:
branches:
only:
- /^release\/v\d+\.\d+\.\d+(-rc\d+)?$/
- /^ci\/.*$/
tags:
only:
- /^v\d+\.\d+\.\d+(-rc\d+)?$/
@ -1078,6 +1083,7 @@ workflows:
branches:
only:
- /^release\/v\d+\.\d+\.\d+(-rc\d+)?$/
- /^ci\/.*$/
tags:
only:
- /^v\d+\.\d+\.\d+(-rc\d+)?$/
@ -1087,6 +1093,7 @@ workflows:
branches:
only:
- /^release\/v\d+\.\d+\.\d+(-rc\d+)?$/
- /^ci\/.*$/
tags:
only:
- /^v\d+\.\d+\.\d+(-rc\d+)?$/
@ -1099,7 +1106,7 @@ workflows:
filters:
branches:
ignore:
- /.*/
- /^.*$/
tags:
only:
- /^v\d+\.\d+\.\d+(-rc\d+)?$/
@ -1114,6 +1121,7 @@ workflows:
branches:
only:
- /^release\/v\d+\.\d+\.\d+(-rc\d+)?$/
- /^ci\/.*$/
- build-docker:
name: "Docker push (lotus-all-in-one / stable / mainnet)"
image: lotus-all-in-one

View File

@ -63,7 +63,7 @@ commands:
name: Install Rust
command: |
curl https://sh.rustup.rs -sSf | sh -s -- -y
- run: make deps lotus
- run: make deps
download-params:
steps:
- restore_cache:
@ -304,9 +304,7 @@ jobs:
darwin: true
darwin-architecture: arm64
- run: |
export CPATH=$(brew --prefix)/include
export LIBRARY_PATH=$(brew --prefix)/lib
make lotus lotus-miner lotus-worker
export CPATH=$(brew --prefix)/include && export LIBRARY_PATH=$(brew --prefix)/lib && make lotus lotus-miner lotus-worker
- run: otool -hv lotus
- run:
name: check tag and version output match
@ -583,6 +581,7 @@ workflows:
branches:
only:
- /^release\/v\d+\.\d+\.\d+(-rc\d+)?$/
- /^ci\/.*$/
tags:
only:
- /^v\d+\.\d+\.\d+(-rc\d+)?$/
@ -592,6 +591,7 @@ workflows:
branches:
only:
- /^release\/v\d+\.\d+\.\d+(-rc\d+)?$/
- /^ci\/.*$/
tags:
only:
- /^v\d+\.\d+\.\d+(-rc\d+)?$/
@ -601,6 +601,7 @@ workflows:
branches:
only:
- /^release\/v\d+\.\d+\.\d+(-rc\d+)?$/
- /^ci\/.*$/
tags:
only:
- /^v\d+\.\d+\.\d+(-rc\d+)?$/
@ -613,7 +614,7 @@ workflows:
filters:
branches:
ignore:
- /.*/
- /^.*$/
tags:
only:
- /^v\d+\.\d+\.\d+(-rc\d+)?$/
@ -628,6 +629,7 @@ workflows:
branches:
only:
- /^release\/v\d+\.\d+\.\d+(-rc\d+)?$/
- /^ci\/.*$/
[[- range .Networks]]
- build-docker:
name: "Docker push (lotus-all-in-one / stable / [[.]])"

View File

@ -43,16 +43,16 @@ func main() {
backupCmd,
lcli.WithCategory("chain", actorCmd),
lcli.WithCategory("chain", infoCmd),
lcli.WithCategory("market", storageDealsCmd),
lcli.WithCategory("market", retrievalDealsCmd),
lcli.WithCategory("market", dataTransfersCmd),
lcli.WithCategory("market", dagstoreCmd),
lcli.WithCategory("market", indexProvCmd),
lcli.WithCategory("market", setHidden(storageDealsCmd)),
lcli.WithCategory("market", setHidden(retrievalDealsCmd)),
lcli.WithCategory("market", setHidden(dataTransfersCmd)),
lcli.WithCategory("market", setHidden(dagstoreCmd)),
lcli.WithCategory("market", setHidden(indexProvCmd)),
lcli.WithCategory("storage", sectorsCmd),
lcli.WithCategory("storage", provingCmd),
lcli.WithCategory("storage", storageCmd),
lcli.WithCategory("storage", sealingCmd),
lcli.WithCategory("retrieval", piecesCmd),
lcli.WithCategory("retrieval", setHidden(piecesCmd)),
}
jaeger := tracing.SetupJaegerTracing("lotus")
@ -86,6 +86,7 @@ func main() {
// adapt the Net* commands to always hit the node running the markets
// subsystem, as that is the only one that runs a libp2p node.
netCmd := *lcli.NetCmd // make a copy.
netCmd.Hidden = true
prev := netCmd.Before
netCmd.Before = func(c *cli.Context) error {
if prev != nil {
@ -137,11 +138,12 @@ func main() {
&cli.StringFlag{
Name: FlagMarketsRepo,
EnvVars: []string{"LOTUS_MARKETS_PATH"},
Usage: fmt.Sprintf("Markets repo path"),
Hidden: true,
},
&cli.BoolFlag{
Name: "call-on-markets",
Usage: "(experimental; may be removed) call this command against a markets node; use only with common commands like net, auth, pprof, etc. whose target may be ambiguous",
Name: "call-on-markets",
Usage: "(experimental; may be removed) call this command against a markets node; use only with common commands like net, auth, pprof, etc. whose target may be ambiguous",
Hidden: true,
},
cliutil.FlagVeryVerbose,
},
@ -190,3 +192,8 @@ func getActorAddress(ctx context.Context, cctx *cli.Context) (maddr address.Addr
return maddr, nil
}
func setHidden(cmd *cli.Command) *cli.Command {
cmd.Hidden = true
return cmd
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,64 @@
package itests
import (
"context"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
// Regression check for a fix introduced in https://github.com/filecoin-project/lotus/pull/10633
func TestPledgeMaxConcurrentGet(t *testing.T) {
require.NoError(t, os.Setenv("GET_2K_MAX_CONCURRENT", "1"))
t.Cleanup(func() {
require.NoError(t, os.Unsetenv("GET_2K_MAX_CONCURRENT"))
})
kit.QuietMiningLogs()
blockTime := 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, miner, ens := kit.EnsembleMinimal(t, kit.NoStorage()) // no mock proofs
ens.InterconnectAll().BeginMiningMustPost(blockTime)
// separate sealed and storage paths so that finalize move needs to happen
miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanSeal = true
})
miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanStore = true
})
// NOTE: This test only repros the issue when Fetch tasks take ~10s, there's
// no great way to do that in a non-horribly-hacky way
/* The horribly hacky way:
diff --git a/storage/sealer/sched_worker.go b/storage/sealer/sched_worker.go
index 35acd755d..76faec859 100644
--- a/storage/sealer/sched_worker.go
+++ b/storage/sealer/sched_worker.go
@@ -513,6 +513,10 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
tw.start()
err = <-werr
+ if req.TaskType == sealtasks.TTFetch {
+ time.Sleep(10 * time.Second)
+ }
+
select {
case req.ret <- workerResponse{err: err}:
case <-req.Ctx.Done():
*/
miner.PledgeSectors(ctx, 3, 0, nil)
}

View File

@ -103,7 +103,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
needRes := worker.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)
// TODO: allow bigger windows
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), needRes, windowRequest.Worker, "schedAcceptable", worker.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), needRes, windowRequest.Worker, "schedAcceptable", worker.Info) {
continue
}

View File

@ -37,7 +37,7 @@ func RandomWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}
@ -71,7 +71,7 @@ func RandomWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []
"worker", bestWid,
"choices", len(choices))
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)

View File

@ -35,7 +35,7 @@ func SpreadWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}
@ -71,7 +71,7 @@ func SpreadWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [
"assigned", bestAssigned)
workerAssigned[bestWid]++
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)

View File

@ -41,7 +41,7 @@ func SpreadTasksWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWind
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}
@ -80,7 +80,7 @@ func SpreadTasksWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWind
"assigned", bestAssigned)
workerAssigned[bestWid]++
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)

View File

@ -35,7 +35,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int,
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
// TODO: allow bigger windows
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}
@ -82,7 +82,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int,
"worker", bestWid,
"utilization", bestUtilization)
workerUtil[bestWid] += windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
workerUtil[bestWid] += windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)

View File

@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"
@ -110,7 +111,7 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg
for i, selected := range candidates {
worker := ps.workers[selected.id]
err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
err := worker.active.withResources(uuid.UUID{}, selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()
@ -148,7 +149,7 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand
continue
}
if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) {
if !wr.active.CanHandleRequest(uuid.UUID{}, ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) {
continue
}

View File

@ -3,6 +3,8 @@ package sealer
import (
"sync"
"github.com/google/uuid"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -20,7 +22,7 @@ type ActiveResources struct {
}
type taskCounter struct {
taskCounters map[sealtasks.SealTaskType]int
taskCounters map[sealtasks.SealTaskType]map[uuid.UUID]int
// this lock is technically redundant, as ActiveResources is always accessed
// with the worker lock, but let's not panic if we ever change that
@ -29,34 +31,48 @@ type taskCounter struct {
func newTaskCounter() *taskCounter {
return &taskCounter{
taskCounters: map[sealtasks.SealTaskType]int{},
taskCounters: make(map[sealtasks.SealTaskType]map[uuid.UUID]int),
}
}
func (tc *taskCounter) Add(tt sealtasks.SealTaskType) {
func (tc *taskCounter) Add(tt sealtasks.SealTaskType, schedID uuid.UUID) {
tc.lk.Lock()
defer tc.lk.Unlock()
tc.taskCounters[tt]++
tc.getUnlocked(tt)[schedID]++
}
func (tc *taskCounter) Free(tt sealtasks.SealTaskType) {
func (tc *taskCounter) Free(tt sealtasks.SealTaskType, schedID uuid.UUID) {
tc.lk.Lock()
defer tc.lk.Unlock()
tc.taskCounters[tt]--
m := tc.getUnlocked(tt)
if m[schedID] <= 1 {
delete(m, schedID)
} else {
m[schedID]--
}
}
func (tc *taskCounter) Get(tt sealtasks.SealTaskType) int {
tc.lk.Lock()
defer tc.lk.Unlock()
func (tc *taskCounter) getUnlocked(tt sealtasks.SealTaskType) map[uuid.UUID]int {
if tc.taskCounters[tt] == nil {
tc.taskCounters[tt] = make(map[uuid.UUID]int)
}
return tc.taskCounters[tt]
}
func (tc *taskCounter) Get(tt sealtasks.SealTaskType) map[uuid.UUID]int {
tc.lk.Lock()
defer tc.lk.Unlock()
return tc.getUnlocked(tt)
}
func (tc *taskCounter) Sum() int {
tc.lk.Lock()
defer tc.lk.Unlock()
sum := 0
for _, v := range tc.taskCounters {
sum += v
sum += len(v)
}
return sum
}
@ -64,8 +80,8 @@ func (tc *taskCounter) Sum() int {
func (tc *taskCounter) ForEach(cb func(tt sealtasks.SealTaskType, count int)) {
tc.lk.Lock()
defer tc.lk.Unlock()
for tt, count := range tc.taskCounters {
cb(tt, count)
for tt, v := range tc.taskCounters {
cb(tt, len(v))
}
}
@ -75,8 +91,8 @@ func NewActiveResources(tc *taskCounter) *ActiveResources {
}
}
func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.CanHandleRequest(tt, r, id, "withResources", wr) {
func (a *ActiveResources) withResources(schedID uuid.UUID, id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.CanHandleRequest(schedID, tt, r, id, "withResources", wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
}
@ -85,11 +101,11 @@ func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.Work
a.waiting--
}
a.Add(tt, wr.Resources, r)
a.Add(schedID, tt, wr.Resources, r)
err := cb()
a.Free(tt, wr.Resources, r)
a.Free(schedID, tt, wr.Resources, r)
return err
}
@ -100,7 +116,7 @@ func (a *ActiveResources) hasWorkWaiting() bool {
}
// add task resources to ActiveResources and return utilization difference
func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 {
func (a *ActiveResources) Add(schedID uuid.UUID, tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 {
startUtil := a.utilization(wr)
if r.GPUUtilization > 0 {
@ -109,19 +125,19 @@ func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerReso
a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory
a.taskCounters.Add(tt)
a.taskCounters.Add(tt, schedID)
return a.utilization(wr) - startUtil
}
func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) {
func (a *ActiveResources) Free(schedID uuid.UUID, tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) {
if r.GPUUtilization > 0 {
a.gpuUsed -= r.GPUUtilization
}
a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
a.taskCounters.Free(tt)
a.taskCounters.Free(tt, schedID)
if a.cond != nil {
a.cond.Broadcast()
@ -130,9 +146,10 @@ func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerRes
// CanHandleRequest evaluates if the worker has enough available resources to
// handle the request.
func (a *ActiveResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
func (a *ActiveResources) CanHandleRequest(schedID uuid.UUID, tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
if needRes.MaxConcurrent > 0 {
if a.taskCounters.Get(tt) >= needRes.MaxConcurrent {
tasks := a.taskCounters.Get(tt)
if len(tasks) >= needRes.MaxConcurrent && (schedID == uuid.UUID{} || tasks[schedID] == 0) {
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters.Get(tt))
return false
}
@ -226,7 +243,7 @@ func (a *ActiveResources) taskCount(tt *sealtasks.SealTaskType) int {
return a.taskCounters.Sum()
}
return a.taskCounters.Get(*tt)
return len(a.taskCounters.Get(*tt))
}
func (wh *WorkerHandle) Utilization() float64 {

View File

@ -698,7 +698,7 @@ func TestWindowCompact(t *testing.T) {
TaskType: task,
Sector: storiface.SectorRef{ProofType: spt},
})
window.Allocated.Add(task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
window.Allocated.Add(uuid.UUID{}, task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
}
wh.activeWindows = append(wh.activeWindows, window)
@ -717,7 +717,7 @@ func TestWindowCompact(t *testing.T) {
for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].Todo[ti].TaskType, "%d, %d", wi, ti)
expectRes.Add(task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
expectRes.Add(uuid.UUID{}, task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
}
require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].Allocated.cpuUse, "%d", wi)

View File

@ -294,14 +294,14 @@ func (sw *schedWorker) workerCompactWindows() {
for ti, todo := range window.Todo {
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType)
if !lower.Allocated.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "compactWindows", worker.Info) {
if !lower.Allocated.CanHandleRequest(todo.SchedId, todo.SealTask(), needRes, sw.wid, "compactWindows", worker.Info) {
continue
}
moved = append(moved, ti)
lower.Todo = append(lower.Todo, todo)
lower.Allocated.Add(todo.SealTask(), worker.Info.Resources, needRes)
window.Allocated.Free(todo.SealTask(), worker.Info.Resources, needRes)
lower.Allocated.Add(todo.SchedId, todo.SealTask(), worker.Info.Resources, needRes)
window.Allocated.Free(todo.SchedId, todo.SealTask(), worker.Info.Resources, needRes)
}
if len(moved) > 0 {
@ -355,7 +355,7 @@ assignLoop:
worker.lk.Lock()
for t, todo := range firstWindow.Todo {
needResPrep := worker.Info.Resources.PrepResourceSpec(todo.Sector.ProofType, todo.TaskType, todo.prepare.PrepType)
if worker.preparing.CanHandleRequest(todo.PrepSealTask(), needResPrep, sw.wid, "startPreparing", worker.Info) {
if worker.preparing.CanHandleRequest(todo.SchedId, todo.PrepSealTask(), needResPrep, sw.wid, "startPreparing", worker.Info) {
tidx = t
break
}
@ -416,7 +416,7 @@ assignLoop:
}
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType)
if worker.active.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) {
if worker.active.CanHandleRequest(todo.SchedId, todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) {
tidx = t
break
}
@ -457,7 +457,7 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
needResPrep := w.Info.Resources.PrepResourceSpec(req.Sector.ProofType, req.TaskType, req.prepare.PrepType)
w.lk.Lock()
w.preparing.Add(req.PrepSealTask(), w.Info.Resources, needResPrep)
w.preparing.Add(req.SchedId, req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock()
go func() {
@ -468,7 +468,7 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
w.lk.Lock()
if err != nil {
w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep)
w.preparing.Free(req.SchedId, req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock()
select {
@ -497,11 +497,12 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
}()
// wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error {
w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep)
err = w.active.withResources(req.SchedId, sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error {
w.preparing.Free(req.SchedId, req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock()
defer w.lk.Lock() // we MUST return locked from this function
// make sure the worker loop sees that the prepare task has finished
select {
case sw.taskDone <- struct{}{}:
case <-sh.closing:
@ -525,6 +526,12 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
w.lk.Unlock()
// make sure the worker loop sees that the task has finished
select {
case sw.taskDone <- struct{}{}:
default: // there is a notification pending already
}
// This error should always be nil, since nothing is setting it, but just to be safe:
if err != nil {
log.Errorf("error executing worker (withResources): %+v", err)
@ -539,7 +546,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *WorkerRequest) error {
needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType)
w.active.Add(req.SealTask(), w.Info.Resources, needRes)
w.active.Add(req.SchedId, req.SealTask(), w.Info.Resources, needRes)
go func() {
// Do the work!
@ -557,7 +564,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *WorkerRequest) error {
w.lk.Lock()
w.active.Free(req.SealTask(), w.Info.Resources, needRes)
w.active.Free(req.SchedId, req.SealTask(), w.Info.Resources, needRes)
select {
case sw.taskDone <- struct{}{}: