Merge pull request #10356 from filecoin-project/feat/assigner-experiments

feat: sched: Assigner experiments
This commit is contained in:
Łukasz Magiera 2023-03-09 01:28:31 +01:00 committed by GitHub
commit 80ccd14447
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 441 additions and 107 deletions

View File

@ -289,14 +289,20 @@ func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.remoteHnd.ServeHTTP(w, r) m.remoteHnd.ServeHTTP(w, r)
} }
func schedNop(context.Context, Worker) error { var schedNop = PrepareAction{
Action: func(ctx context.Context, w Worker) error {
return nil return nil
},
PrepType: sealtasks.TTNoop,
} }
func (m *Manager) schedFetch(sector storiface.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error { func (m *Manager) schedFetch(sector storiface.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) PrepareAction {
return func(ctx context.Context, worker Worker) error { return PrepareAction{
Action: func(ctx context.Context, worker Worker) error {
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am)) _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am))
return err return err
},
PrepType: sealtasks.TTFetch,
} }
} }
@ -315,7 +321,8 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storiface.Secto
// if the selected worker does NOT have the sealed files for the sector, instruct it to fetch it from a worker that has them and // if the selected worker does NOT have the sealed files for the sector, instruct it to fetch it from a worker that has them and
// put it in the sealing scratch space. // put it in the sealing scratch space.
sealFetch := func(ctx context.Context, worker Worker) error { sealFetch := PrepareAction{
Action: func(ctx context.Context, worker Worker) error {
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID) log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)) _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy))
_, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy)) _, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy))
@ -325,6 +332,8 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storiface.Secto
} }
return nil return nil
},
PrepType: sealtasks.TTFetch,
} }
if unsealed == nil { if unsealed == nil {

View File

@ -42,6 +42,10 @@ func WithPriority(ctx context.Context, priority int) context.Context {
const mib = 1 << 20 const mib = 1 << 20
type WorkerAction func(ctx context.Context, w Worker) error type WorkerAction func(ctx context.Context, w Worker) error
type PrepareAction struct {
Action WorkerAction
PrepType sealtasks.TaskType
}
type SchedWorker interface { type SchedWorker interface {
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
@ -130,7 +134,7 @@ type WorkerRequest struct {
Sel WorkerSelector Sel WorkerSelector
SchedId uuid.UUID SchedId uuid.UUID
prepare WorkerAction prepare PrepareAction
work WorkerAction work WorkerAction
start time.Time start time.Time
@ -157,7 +161,15 @@ func newScheduler(ctx context.Context, assigner string) (*Scheduler, error) {
case "", "utilization": case "", "utilization":
a = NewLowestUtilizationAssigner() a = NewLowestUtilizationAssigner()
case "spread": case "spread":
a = NewSpreadAssigner() a = NewSpreadAssigner(false)
case "experiment-spread-qcount":
a = NewSpreadAssigner(true)
case "experiment-spread-tasks":
a = NewSpreadTasksAssigner(false)
case "experiment-spread-tasks-qcount":
a = NewSpreadTasksAssigner(true)
case "experiment-random":
a = NewRandomAssigner()
default: default:
return nil, xerrors.Errorf("unknown assigner '%s'", assigner) return nil, xerrors.Errorf("unknown assigner '%s'", assigner)
} }
@ -189,7 +201,7 @@ func newScheduler(ctx context.Context, assigner string) (*Scheduler, error) {
}, nil }, nil
} }
func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error { func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare PrepareAction, work WorkerAction) error {
ret := make(chan workerResponse) ret := make(chan workerResponse)
select { select {
@ -239,6 +251,13 @@ func (r *WorkerRequest) SealTask() sealtasks.SealTaskType {
} }
} }
func (r *WorkerRequest) PrepSealTask() sealtasks.SealTaskType {
return sealtasks.SealTaskType{
TaskType: r.prepare.PrepType,
RegisteredSealProof: r.Sector.ProofType,
}
}
type SchedDiagRequestInfo struct { type SchedDiagRequestInfo struct {
Sector abi.SectorID Sector abi.SectorID
TaskType sealtasks.TaskType TaskType sealtasks.TaskType

View File

@ -58,7 +58,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
windows := make([]SchedWindow, windowsLen) windows := make([]SchedWindow, windowsLen)
for i := range windows { for i := range windows {
windows[i].Allocated = *NewActiveResources() windows[i].Allocated = *NewActiveResources(newTaskCounter())
} }
acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex

View File

@ -0,0 +1,88 @@
package sealer
import (
"math/rand"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
func NewRandomAssigner() Assigner {
return &AssignerCommon{
WindowSel: RandomWS,
}
}
func RandomWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
scheduled := 0
rmQueue := make([]int, 0, queueLen)
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.SchedQueue)[sqi]
//bestAssigned := math.MaxInt // smaller = better
type choice struct {
selectedWindow int
needRes storiface.Resources
info storiface.WorkerInfo
bestWid storiface.WorkerID
}
choices := make([]choice, 0, len(acceptableWindows[task.IndexHeap]))
for i, wnd := range acceptableWindows[task.IndexHeap] {
wid := sh.OpenWindows[wnd].Worker
w := sh.Workers[wid]
res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}
choices = append(choices, choice{
selectedWindow: wnd,
needRes: res,
info: w.Info,
bestWid: wid,
})
}
if len(choices) == 0 {
// all windows full
continue
}
// chose randomly
randIndex := rand.Intn(len(choices))
selectedWindow := choices[randIndex].selectedWindow
needRes := choices[randIndex].needRes
info := choices[randIndex].info
bestWid := choices[randIndex].bestWid
log.Debugw("SCHED ASSIGNED",
"assigner", "darts",
"sqi", sqi,
"sector", task.Sector.ID.Number,
"task", task.TaskType,
"window", selectedWindow,
"worker", bestWid,
"choices", len(choices))
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)
scheduled++
}
if len(rmQueue) > 0 {
for i := len(rmQueue) - 1; i >= 0; i-- {
sh.SchedQueue.Remove(rmQueue[i])
}
}
return scheduled
}

View File

@ -6,13 +6,14 @@ import (
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
) )
func NewSpreadAssigner() Assigner { func NewSpreadAssigner(queued bool) Assigner {
return &AssignerCommon{ return &AssignerCommon{
WindowSel: SpreadWS, WindowSel: SpreadWS(queued),
} }
} }
func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int { func SpreadWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
return func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
scheduled := 0 scheduled := 0
rmQueue := make([]int, 0, queueLen) rmQueue := make([]int, 0, queueLen)
workerAssigned := map[storiface.WorkerID]int{} workerAssigned := map[storiface.WorkerID]int{}
@ -38,7 +39,11 @@ func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []
continue continue
} }
wu, _ := workerAssigned[wid] wu, found := workerAssigned[wid]
if !found && queued {
wu = w.TaskCounts()
workerAssigned[wid] = wu
}
if wu >= bestAssigned { if wu >= bestAssigned {
continue continue
} }
@ -56,6 +61,8 @@ func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []
} }
log.Debugw("SCHED ASSIGNED", log.Debugw("SCHED ASSIGNED",
"assigner", "spread",
"spread-queued", queued,
"sqi", sqi, "sqi", sqi,
"sector", task.Sector.ID.Number, "sector", task.Sector.ID.Number,
"task", task.TaskType, "task", task.TaskType,
@ -79,3 +86,4 @@ func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []
return scheduled return scheduled
} }
}

View File

@ -0,0 +1,98 @@
package sealer
import (
"math"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
func NewSpreadTasksAssigner(queued bool) Assigner {
return &AssignerCommon{
WindowSel: SpreadTasksWS(queued),
}
}
type widTask struct {
wid storiface.WorkerID
tt sealtasks.TaskType
}
func SpreadTasksWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
return func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
scheduled := 0
rmQueue := make([]int, 0, queueLen)
workerAssigned := map[widTask]int{}
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.SchedQueue)[sqi]
selectedWindow := -1
var needRes storiface.Resources
var info storiface.WorkerInfo
var bestWid widTask
bestAssigned := math.MaxInt // smaller = better
for i, wnd := range acceptableWindows[task.IndexHeap] {
wid := sh.OpenWindows[wnd].Worker
w := sh.Workers[wid]
res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}
wt := widTask{wid: wid, tt: task.TaskType}
wu, found := workerAssigned[wt]
if !found && queued {
st := task.SealTask()
wu = w.TaskCount(&st)
workerAssigned[wt] = wu
}
if wu >= bestAssigned {
continue
}
info = w.Info
needRes = res
bestWid = wt
selectedWindow = wnd
bestAssigned = wu
}
if selectedWindow < 0 {
// all windows full
continue
}
log.Debugw("SCHED ASSIGNED",
"assigner", "spread-tasks",
"spread-queued", queued,
"sqi", sqi,
"sector", task.Sector.ID.Number,
"task", task.TaskType,
"window", selectedWindow,
"worker", bestWid,
"assigned", bestAssigned)
workerAssigned[bestWid]++
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)
scheduled++
}
if len(rmQueue) > 0 {
for i := len(rmQueue) - 1; i >= 0; i-- {
sh.SchedQueue.Remove(rmQueue[i])
}
}
return scheduled
}
}

View File

@ -74,6 +74,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int,
} }
log.Debugw("SCHED ASSIGNED", log.Debugw("SCHED ASSIGNED",
"assigner", "util",
"sqi", sqi, "sqi", sqi,
"sector", task.Sector.ID.Number, "sector", task.Sector.ID.Number,
"task", task.TaskType, "task", task.TaskType,

View File

@ -13,18 +13,68 @@ type ActiveResources struct {
gpuUsed float64 gpuUsed float64
cpuUse uint64 cpuUse uint64
taskCounters map[sealtasks.SealTaskType]int taskCounters *taskCounter
cond *sync.Cond cond *sync.Cond
waiting int waiting int
} }
func NewActiveResources() *ActiveResources { type taskCounter struct {
return &ActiveResources{ taskCounters map[sealtasks.SealTaskType]int
// this lock is technically redundant, as ActiveResources is always accessed
// with the worker lock, but let's not panic if we ever change that
lk sync.Mutex
}
func newTaskCounter() *taskCounter {
return &taskCounter{
taskCounters: map[sealtasks.SealTaskType]int{}, taskCounters: map[sealtasks.SealTaskType]int{},
} }
} }
func (tc *taskCounter) Add(tt sealtasks.SealTaskType) {
tc.lk.Lock()
defer tc.lk.Unlock()
tc.taskCounters[tt]++
}
func (tc *taskCounter) Free(tt sealtasks.SealTaskType) {
tc.lk.Lock()
defer tc.lk.Unlock()
tc.taskCounters[tt]--
}
func (tc *taskCounter) Get(tt sealtasks.SealTaskType) int {
tc.lk.Lock()
defer tc.lk.Unlock()
return tc.taskCounters[tt]
}
func (tc *taskCounter) Sum() int {
tc.lk.Lock()
defer tc.lk.Unlock()
sum := 0
for _, v := range tc.taskCounters {
sum += v
}
return sum
}
func (tc *taskCounter) ForEach(cb func(tt sealtasks.SealTaskType, count int)) {
tc.lk.Lock()
defer tc.lk.Unlock()
for tt, count := range tc.taskCounters {
cb(tt, count)
}
}
func NewActiveResources(tc *taskCounter) *ActiveResources {
return &ActiveResources{
taskCounters: tc,
}
}
func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error { func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.CanHandleRequest(tt, r, id, "withResources", wr) { for !a.CanHandleRequest(tt, r, id, "withResources", wr) {
if a.cond == nil { if a.cond == nil {
@ -59,7 +109,7 @@ func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerReso
a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs)) a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin += r.MinMemory a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory a.memUsedMax += r.MaxMemory
a.taskCounters[tt]++ a.taskCounters.Add(tt)
return a.utilization(wr) - startUtil return a.utilization(wr) - startUtil
} }
@ -71,7 +121,7 @@ func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerRes
a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs)) a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin -= r.MinMemory a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory a.memUsedMax -= r.MaxMemory
a.taskCounters[tt]-- a.taskCounters.Free(tt)
if a.cond != nil { if a.cond != nil {
a.cond.Broadcast() a.cond.Broadcast()
@ -82,8 +132,8 @@ func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerRes
// handle the request. // handle the request.
func (a *ActiveResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool { func (a *ActiveResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
if needRes.MaxConcurrent > 0 { if needRes.MaxConcurrent > 0 {
if a.taskCounters[tt] >= needRes.MaxConcurrent { if a.taskCounters.Get(tt) >= needRes.MaxConcurrent {
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters[tt]) log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters.Get(tt))
return false return false
} }
} }
@ -170,6 +220,15 @@ func (a *ActiveResources) utilization(wr storiface.WorkerResources) float64 { //
return max return max
} }
func (a *ActiveResources) taskCount(tt *sealtasks.SealTaskType) int {
// nil means all tasks
if tt == nil {
return a.taskCounters.Sum()
}
return a.taskCounters.Get(*tt)
}
func (wh *WorkerHandle) Utilization() float64 { func (wh *WorkerHandle) Utilization() float64 {
wh.lk.Lock() wh.lk.Lock()
u := wh.active.utilization(wh.Info.Resources) u := wh.active.utilization(wh.Info.Resources)
@ -183,3 +242,31 @@ func (wh *WorkerHandle) Utilization() float64 {
return u return u
} }
func (wh *WorkerHandle) TaskCounts() int {
wh.lk.Lock()
u := wh.active.taskCount(nil)
u += wh.preparing.taskCount(nil)
wh.lk.Unlock()
wh.wndLk.Lock()
for _, window := range wh.activeWindows {
u += window.Allocated.taskCount(nil)
}
wh.wndLk.Unlock()
return u
}
func (wh *WorkerHandle) TaskCount(tt *sealtasks.SealTaskType) int {
wh.lk.Lock()
u := wh.active.taskCount(tt)
u += wh.preparing.taskCount(tt)
wh.lk.Unlock()
wh.wndLk.Lock()
for _, window := range wh.activeWindows {
u += window.Allocated.taskCount(tt)
}
wh.wndLk.Unlock()
return u
}

View File

@ -288,7 +288,8 @@ func TestSched(t *testing.T) {
ProofType: spt, ProofType: spt,
} }
err := sched.Schedule(ctx, sectorRef, taskType, sel, func(ctx context.Context, w Worker) error { prep := PrepareAction{
Action: func(ctx context.Context, w Worker) error {
wi, err := w.Info(ctx) wi, err := w.Info(ctx)
require.NoError(t, err) require.NoError(t, err)
@ -306,7 +307,11 @@ func TestSched(t *testing.T) {
log.Info("OUT ", taskName) log.Info("OUT ", taskName)
return nil return nil
}, noopAction) },
PrepType: taskType,
}
err := sched.Schedule(ctx, sectorRef, taskType, sel, prep, noopAction)
if err != context.Canceled { if err != context.Canceled {
require.NoError(t, err, fmt.Sprint(l, l2)) require.NoError(t, err, fmt.Sprint(l, l2))
} }
@ -639,8 +644,8 @@ func BenchmarkTrySched(b *testing.B) {
Resources: decentWorkerResources, Resources: decentWorkerResources,
}, },
Enabled: true, Enabled: true,
preparing: NewActiveResources(), preparing: NewActiveResources(newTaskCounter()),
active: NewActiveResources(), active: NewActiveResources(newTaskCounter()),
} }
for i := 0; i < windows; i++ { for i := 0; i < windows; i++ {
@ -685,7 +690,7 @@ func TestWindowCompact(t *testing.T) {
for _, windowTasks := range start { for _, windowTasks := range start {
window := &SchedWindow{ window := &SchedWindow{
Allocated: *NewActiveResources(), Allocated: *NewActiveResources(newTaskCounter()),
} }
for _, task := range windowTasks { for _, task := range windowTasks {
@ -708,7 +713,7 @@ func TestWindowCompact(t *testing.T) {
require.Equal(t, len(start)-len(expect), -sw.windowsRequested) require.Equal(t, len(start)-len(expect), -sw.windowsRequested)
for wi, tasks := range expect { for wi, tasks := range expect {
expectRes := NewActiveResources() expectRes := NewActiveResources(newTaskCounter())
for ti, task := range tasks { for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].Todo[ti].TaskType, "%d, %d", wi, ti) require.Equal(t, task, wh.activeWindows[wi].Todo[ti].TaskType, "%d, %d", wi, ti)

View File

@ -30,12 +30,14 @@ func newWorkerHandle(ctx context.Context, w Worker) (*WorkerHandle, error) {
return nil, xerrors.Errorf("getting worker info: %w", err) return nil, xerrors.Errorf("getting worker info: %w", err)
} }
tc := newTaskCounter()
worker := &WorkerHandle{ worker := &WorkerHandle{
workerRpc: w, workerRpc: w,
Info: info, Info: info,
preparing: NewActiveResources(), preparing: NewActiveResources(tc),
active: NewActiveResources(), active: NewActiveResources(tc),
Enabled: true, Enabled: true,
closingMgr: make(chan struct{}), closingMgr: make(chan struct{}),
@ -352,8 +354,8 @@ assignLoop:
worker.lk.Lock() worker.lk.Lock()
for t, todo := range firstWindow.Todo { for t, todo := range firstWindow.Todo {
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType) needResPrep := worker.Info.Resources.PrepResourceSpec(todo.Sector.ProofType, todo.TaskType, todo.prepare.PrepType)
if worker.preparing.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) { if worker.preparing.CanHandleRequest(todo.PrepSealTask(), needResPrep, sw.wid, "startPreparing", worker.Info) {
tidx = t tidx = t
break break
} }
@ -452,20 +454,21 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
w, sh := sw.worker, sw.sched w, sh := sw.worker, sw.sched
needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType) needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType)
needResPrep := w.Info.Resources.PrepResourceSpec(req.Sector.ProofType, req.TaskType, req.prepare.PrepType)
w.lk.Lock() w.lk.Lock()
w.preparing.Add(req.SealTask(), w.Info.Resources, needRes) w.preparing.Add(req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock() w.lk.Unlock()
go func() { go func() {
// first run the prepare step (e.g. fetching sector data from other worker) // first run the prepare step (e.g. fetching sector data from other worker)
tw := sh.workTracker.worker(sw.wid, w.Info, w.workerRpc) tw := sh.workTracker.worker(sw.wid, w.Info, w.workerRpc)
tw.start() tw.start()
err := req.prepare(req.Ctx, tw) err := req.prepare.Action(req.Ctx, tw)
w.lk.Lock() w.lk.Lock()
if err != nil { if err != nil {
w.preparing.Free(req.SealTask(), w.Info.Resources, needRes) w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock() w.lk.Unlock()
select { select {
@ -495,7 +498,7 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
// wait (if needed) for resources in the 'active' window // wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error { err = w.active.withResources(sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error {
w.preparing.Free(req.SealTask(), w.Info.Resources, needRes) w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock() w.lk.Unlock()
defer w.lk.Lock() // we MUST return locked from this function defer w.lk.Lock() // we MUST return locked from this function

View File

@ -36,6 +36,8 @@ const (
TTGenerateWindowPoSt TaskType = "post/v0/windowproof" TTGenerateWindowPoSt TaskType = "post/v0/windowproof"
TTGenerateWinningPoSt TaskType = "post/v0/winningproof" TTGenerateWinningPoSt TaskType = "post/v0/winningproof"
TTNoop TaskType = ""
) )
var order = map[TaskType]int{ var order = map[TaskType]int{

View File

@ -43,9 +43,9 @@ func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.Worke
TaskCounts: map[string]int{}, TaskCounts: map[string]int{},
} }
for tt, count := range handle.active.taskCounters { handle.active.taskCounters.ForEach(func(tt sealtasks.SealTaskType, count int) {
out[uuid.UUID(id)].TaskCounts[tt.String()] = count out[uuid.UUID(id)].TaskCounts[tt.String()] = count
} })
handle.lk.Unlock() handle.lk.Unlock()
} }

View File

@ -65,6 +65,20 @@ func (wr WorkerResources) ResourceSpec(spt abi.RegisteredSealProof, tt sealtasks
return res return res
} }
// PrepResourceSpec is like ResourceSpec, but meant for use limiting parallel preparing
// tasks.
func (wr WorkerResources) PrepResourceSpec(spt abi.RegisteredSealProof, tt, prepTT sealtasks.TaskType) Resources {
res := wr.ResourceSpec(spt, tt)
if prepTT != tt && prepTT != sealtasks.TTNoop {
prepRes := wr.ResourceSpec(spt, prepTT)
res.MaxConcurrent = prepRes.MaxConcurrent
}
// otherwise, use the default resource table
return res
}
type WorkerStats struct { type WorkerStats struct {
Info WorkerInfo Info WorkerInfo
Tasks []sealtasks.TaskType Tasks []sealtasks.TaskType