diff --git a/cmd/lotus-miner/run.go b/cmd/lotus-miner/run.go index 03ebf9e1c..f276f319c 100644 --- a/cmd/lotus-miner/run.go +++ b/cmd/lotus-miner/run.go @@ -4,7 +4,6 @@ import ( "fmt" _ "net/http/pprof" "os" - "strconv" "github.com/filecoin-project/lotus/api/v1api" @@ -50,11 +49,6 @@ var runCmd = &cli.Command{ Usage: "manage open file limit", Value: true, }, - &cli.IntFlag{ - Name: "parallel-p1-limit", - Usage: "maximum pre commit1 operations to run in parallel", - Value: -1, - }, }, Action: func(cctx *cli.Context) error { if !cctx.Bool("enable-gpu-proving") { @@ -64,8 +58,6 @@ var runCmd = &cli.Command{ } } - os.Setenv("PARALLEL_P1_LIMIT", strconv.Itoa(cctx.Int("parallel-p1-limit"))) - ctx, _ := tag.New(lcli.DaemonContext(cctx), tag.Insert(metrics.Version, build.BuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit), diff --git a/cmd/lotus-miner/sealing.go b/cmd/lotus-miner/sealing.go index fab7fb880..6cd4f3073 100644 --- a/cmd/lotus-miner/sealing.go +++ b/cmd/lotus-miner/sealing.go @@ -97,13 +97,20 @@ func workersCmd(sealing bool) *cli.Command { return st[i].id.String() < st[j].id.String() }) + /* + Example output: + + Worker c4d65451-07f8-4230-98ad-4f33dea2a8cc, host myhostname + TASK: PC1(1/4) AP(15/15) GET(3) + CPU: [|||||||| ] 16/128 core(s) in use + RAM: [|||||||| ] 12% 125.8 GiB/1008 GiB + VMEM: [|||||||| ] 12% 125.8 GiB/1008 GiB + GPU: [ ] 0% 0.00/1 gpu(s) in use + GPU: NVIDIA GeForce RTX 3090, not used + */ + for _, stat := range st { - gpuUse := "not " - gpuCol := color.FgBlue - if stat.GpuUsed > 0 { - gpuCol = color.FgGreen - gpuUse = "" - } + // Worker uuid + name var disabled string if !stat.Enabled { @@ -112,9 +119,53 @@ func workersCmd(sealing bool) *cli.Command { fmt.Printf("Worker %s, host %s%s\n", stat.id, color.MagentaString(stat.Info.Hostname), disabled) + // Task counts + tc := make([][]string, 0, len(stat.TaskCounts)) + + for st, c := range stat.TaskCounts { + if c == 0 { + continue + } + + stt, err := sealtasks.SttFromString(st) + if err != nil { + return err + } + + str := fmt.Sprint(c) + if max := stat.Info.Resources.ResourceSpec(stt.RegisteredSealProof, stt.TaskType).MaxConcurrent; max > 0 { + switch { + case c < max: + str = color.GreenString(str) + case c >= max: + str = color.YellowString(str) + } + str = fmt.Sprintf("%s/%d", str, max) + } else { + str = color.CyanString(str) + } + str = fmt.Sprintf("%s(%s)", color.BlueString(stt.Short()), str) + + tc = append(tc, []string{string(stt.TaskType), str}) + } + sort.Slice(tc, func(i, j int) bool { + return sealtasks.TaskType(tc[i][0]).Less(sealtasks.TaskType(tc[j][0])) + }) + var taskStr string + for _, t := range tc { + taskStr = t[1] + " " + } + if taskStr != "" { + fmt.Printf("\tTASK: %s\n", taskStr) + } + + // CPU use + fmt.Printf("\tCPU: [%s] %d/%d core(s) in use\n", barString(float64(stat.Info.Resources.CPUs), 0, float64(stat.CpuUse)), stat.CpuUse, stat.Info.Resources.CPUs) + // RAM use + ramTotal := stat.Info.Resources.MemPhysical ramTasks := stat.MemUsedMin ramUsed := stat.Info.Resources.MemUsed @@ -129,6 +180,8 @@ func workersCmd(sealing bool) *cli.Command { types.SizeStr(types.NewInt(ramTasks+ramUsed)), types.SizeStr(types.NewInt(stat.Info.Resources.MemPhysical))) + // VMEM use (ram+swap) + vmemTotal := stat.Info.Resources.MemPhysical + stat.Info.Resources.MemSwap vmemTasks := stat.MemUsedMax vmemUsed := stat.Info.Resources.MemUsed + stat.Info.Resources.MemSwapUsed @@ -143,21 +196,24 @@ func workersCmd(sealing bool) *cli.Command { types.SizeStr(types.NewInt(vmemTasks+vmemReserved)), types.SizeStr(types.NewInt(vmemTotal))) + // GPU use + if len(stat.Info.Resources.GPUs) > 0 { gpuBar := barString(float64(len(stat.Info.Resources.GPUs)), 0, stat.GpuUsed) fmt.Printf("\tGPU: [%s] %.f%% %.2f/%d gpu(s) in use\n", color.GreenString(gpuBar), stat.GpuUsed*100/float64(len(stat.Info.Resources.GPUs)), stat.GpuUsed, len(stat.Info.Resources.GPUs)) } + + gpuUse := "not " + gpuCol := color.FgBlue + if stat.GpuUsed > 0 { + gpuCol = color.FgGreen + gpuUse = "" + } for _, gpu := range stat.Info.Resources.GPUs { fmt.Printf("\tGPU: %s\n", color.New(gpuCol).Sprintf("%s, %sused", gpu, gpuUse)) } - - plConfig, ok := stat.Info.TaskLimits[sealtasks.TTPreCommit1] - if ok && plConfig.LimitCount > 0 { - fmt.Printf("\tP1LIMIT: [%s] %d/%d tasks are running\n", - barString(float64(plConfig.LimitCount), 0, float64(plConfig.RunCount)), plConfig.RunCount, plConfig.LimitCount) - } } return nil diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index 88d31282a..83c821105 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -9,7 +9,6 @@ import ( "net/http" "os" "path/filepath" - "strconv" "strings" "time" @@ -209,11 +208,6 @@ var runCmd = &cli.Command{ Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function", Value: "30m", }, - &cli.IntFlag{ - Name: "parallel-p1-limit", - Usage: "maximum precommit1 operations to run in parallel", - Value: -1, - }, }, Before: func(cctx *cli.Context) error { if cctx.IsSet("address") { @@ -234,8 +228,6 @@ var runCmd = &cli.Command{ } } - os.Setenv("PARALLEL_P1_LIMIT", strconv.Itoa(cctx.Int("parallel-p1-limit"))) - limit, _, err := ulimit.GetLimit() switch { case err == ulimit.ErrUnsupported: diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index cbe948797..a3a37a11a 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -116,16 +116,6 @@ type workerDisableReq struct { done func() } -type activeResources struct { - memUsedMin uint64 - memUsedMax uint64 - gpuUsed float64 - cpuUse uint64 - - cond *sync.Cond - waiting int -} - type workerRequest struct { sector storage.SectorRef taskType sealtasks.TaskType @@ -214,6 +204,13 @@ func (r *workerRequest) respond(err error) { } } +func (r *workerRequest) SealTask() sealtasks.SealTaskType { + return sealtasks.SealTaskType{ + TaskType: r.taskType, + RegisteredSealProof: r.sector.ProofType, + } +} + type SchedDiagRequestInfo struct { Sector abi.SectorID TaskType sealtasks.TaskType @@ -366,6 +363,9 @@ func (sh *scheduler) trySched() { } windows := make([]schedWindow, windowsLen) + for i := range windows { + windows[i].allocated = *newActiveResources() + } acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex // Step 1 @@ -401,7 +401,7 @@ func (sh *scheduler) trySched() { needRes := worker.info.Resources.ResourceSpec(task.sector.ProofType, task.taskType) // TODO: allow bigger windows - if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info) { + if !windows[wnd].allocated.canHandleRequest(task.SealTask(), needRes, windowRequest.worker, "schedAcceptable", worker.info) { continue } @@ -475,16 +475,12 @@ func (sh *scheduler) trySched() { wid := sh.openWindows[wnd].worker w := sh.workers[wid] - res := info.Resources.ResourceSpec(task.sector.ProofType, task.taskType) + res := w.info.Resources.ResourceSpec(task.sector.ProofType, task.taskType) log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.sector.ID.Number, wnd, i) // TODO: allow bigger windows - if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", info) { - continue - } - - if !sh.CanHandleTask(task.taskType, wid) { + if !windows[wnd].allocated.canHandleRequest(task.SealTask(), res, wid, "schedAssign", w.info) { continue } @@ -507,7 +503,6 @@ func (sh *scheduler) trySched() { // #--------> acceptableWindow index // // * -> we're here - sh.TaskAdd(task.taskType, bestWid) break } @@ -531,7 +526,7 @@ func (sh *scheduler) trySched() { "worker", bestWid, "utilization", bestUtilization) - workerUtil[bestWid] += windows[selectedWindow].allocated.add(info.Resources, needRes) + workerUtil[bestWid] += windows[selectedWindow].allocated.add(task.SealTask(), info.Resources, needRes) windows[selectedWindow].todo = append(windows[selectedWindow].todo, task) rmQueue = append(rmQueue, sqi) @@ -615,55 +610,3 @@ func (sh *scheduler) Close(ctx context.Context) error { } return nil } - -func (sh *scheduler) CanHandleTask(taskType sealtasks.TaskType, wid storiface.WorkerID) (flag bool) { - if wh, ok := sh.workers[wid]; ok { - wh.info.TaskLimitLk.Lock() - defer wh.info.TaskLimitLk.Unlock() - taskLimit, ok := wh.info.TaskLimits[taskType] - if !ok { - flag = true - return - } - log.Debugf("CanHandleTask: %v:%v", taskLimit.LimitCount, taskLimit.RunCount) - if taskLimit.LimitCount > 0 { - freeCount := taskLimit.LimitCount - taskLimit.RunCount - if freeCount > 0 { - flag = true - } - } else { - flag = true - } - } else { - flag = true - } - return -} - -func (sh *scheduler) TaskAdd(taskType sealtasks.TaskType, wid storiface.WorkerID) { - log.Debugf("begin task add:%v-%v", wid, taskType) - if wh, ok := sh.workers[wid]; ok { - wh.info.TaskLimitLk.Lock() - defer wh.info.TaskLimitLk.Unlock() - taskLimit, ok := wh.info.TaskLimits[taskType] - if ok { - log.Debugf("task limit:%v-%v", taskLimit.LimitCount, taskLimit.RunCount) - taskLimit.RunCount++ - } - } - -} - -func (sh *scheduler) TaskReduce(taskType sealtasks.TaskType, wid storiface.WorkerID) { - log.Debugf("begin task reduce:%v-%v", wid, taskType) - if wh, ok := sh.workers[wid]; ok { - wh.info.TaskLimitLk.Lock() - defer wh.info.TaskLimitLk.Unlock() - taskLimit, ok := wh.info.TaskLimits[taskType] - if ok { - log.Debugf("task limit:%v-%v", taskLimit.LimitCount, taskLimit.RunCount) - taskLimit.RunCount-- - } - } - -} diff --git a/extern/sector-storage/sched_post.go b/extern/sector-storage/sched_post.go index 58d79fc86..f1963ad69 100644 --- a/extern/sector-storage/sched_post.go +++ b/extern/sector-storage/sched_post.go @@ -105,7 +105,7 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg selected := candidates[0] worker := ps.workers[selected.id] - return worker.active.withResources(selected.id, worker.info, selected.res, &ps.lk, func() error { + return worker.active.withResources(selected.id, worker.info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error { ps.lk.Unlock() defer ps.lk.Lock() @@ -124,7 +124,7 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand for wid, wr := range ps.workers { needRes := wr.info.Resources.ResourceSpec(spt, ps.postType) - if !wr.active.canHandleRequest(needRes, wid, "post-readyWorkers", wr.info) { + if !wr.active.canHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.info) { continue } diff --git a/extern/sector-storage/sched_resources.go b/extern/sector-storage/sched_resources.go index 458ca981d..efb9bdd50 100644 --- a/extern/sector-storage/sched_resources.go +++ b/extern/sector-storage/sched_resources.go @@ -9,8 +9,26 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) -func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, r storiface.Resources, locker sync.Locker, cb func() error) error { - for !a.canHandleRequest(r, id, "withResources", wr) { +type activeResources struct { + memUsedMin uint64 + memUsedMax uint64 + gpuUsed float64 + cpuUse uint64 + + taskCounters map[sealtasks.SealTaskType]int + + cond *sync.Cond + waiting int +} + +func newActiveResources() *activeResources { + return &activeResources{ + taskCounters: map[sealtasks.SealTaskType]int{}, + } +} + +func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error { + for !a.canHandleRequest(tt, r, id, "withResources", wr) { if a.cond == nil { a.cond = sync.NewCond(locker) } @@ -19,11 +37,11 @@ func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.Work a.waiting-- } - a.add(wr.Resources, r) + a.add(tt, wr.Resources, r) err := cb() - a.free(wr.Resources, r) + a.free(tt, wr.Resources, r) return err } @@ -34,7 +52,7 @@ func (a *activeResources) hasWorkWaiting() bool { } // add task resources to activeResources and return utilization difference -func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resources) float64 { +func (a *activeResources) add(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 { startUtil := a.utilization(wr) if r.GPUUtilization > 0 { @@ -43,17 +61,21 @@ func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resource a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs)) a.memUsedMin += r.MinMemory a.memUsedMax += r.MaxMemory + t := a.taskCounters[tt] + t++ + a.taskCounters[tt] = t return a.utilization(wr) - startUtil } -func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resources) { +func (a *activeResources) free(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) { if r.GPUUtilization > 0 { a.gpuUsed -= r.GPUUtilization } a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs)) a.memUsedMin -= r.MinMemory a.memUsedMax -= r.MaxMemory + a.taskCounters[tt]-- if a.cond != nil { a.cond.Broadcast() @@ -62,7 +84,14 @@ func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resourc // canHandleRequest evaluates if the worker has enough available resources to // handle the request. -func (a *activeResources) canHandleRequest(needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool { +func (a *activeResources) canHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool { + if needRes.MaxConcurrent > 0 { + if a.taskCounters[tt] >= needRes.MaxConcurrent { + log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters[tt]) + return false + } + } + if info.IgnoreResources { // shortcircuit; if this worker is ignoring resources, it can always handle the request. return true @@ -110,7 +139,7 @@ func (a *activeResources) canHandleRequest(needRes storiface.Resources, wid stor } // utilization returns a number in 0..1 range indicating fraction of used resources -func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { +func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { // todo task type var max float64 cpu := float64(a.cpuUse) / float64(wr.CPUs) diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 5b01e25ad..9cddb7a21 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -161,15 +161,9 @@ func (s *schedTestWorker) Paths(ctx context.Context) ([]storiface.StoragePath, e } func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) { - taskLimits := make(map[sealtasks.TaskType]*storiface.LimitConfig) - taskLimits[sealtasks.TTPreCommit1] = &storiface.LimitConfig{ - LimitCount: 6, - RunCount: 0, - } return storiface.WorkerInfo{ Hostname: s.name, IgnoreResources: s.ignoreResources, - TaskLimits: taskLimits, Resources: s.resources, }, nil } @@ -617,8 +611,8 @@ func BenchmarkTrySched(b *testing.B) { Hostname: "t", Resources: decentWorkerResources, }, - preparing: &activeResources{}, - active: &activeResources{}, + preparing: newActiveResources(), + active: newActiveResources(), } for i := 0; i < windows; i++ { @@ -662,14 +656,16 @@ func TestWindowCompact(t *testing.T) { } for _, windowTasks := range start { - window := &schedWindow{} + window := &schedWindow{ + allocated: *newActiveResources(), + } for _, task := range windowTasks { window.todo = append(window.todo, &workerRequest{ taskType: task, sector: storage.SectorRef{ProofType: spt}, }) - window.allocated.add(wh.info.Resources, storiface.ResourceTable[task][spt]) + window.allocated.add(task.SealTask(spt), wh.info.Resources, storiface.ResourceTable[task][spt]) } wh.activeWindows = append(wh.activeWindows, window) @@ -688,7 +684,7 @@ func TestWindowCompact(t *testing.T) { for ti, task := range tasks { require.Equal(t, task, wh.activeWindows[wi].todo[ti].taskType, "%d, %d", wi, ti) - expectRes.add(wh.info.Resources, storiface.ResourceTable[task][spt]) + expectRes.add(task.SealTask(spt), wh.info.Resources, storiface.ResourceTable[task][spt]) } require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].allocated.cpuUse, "%d", wi) diff --git a/extern/sector-storage/sched_worker.go b/extern/sector-storage/sched_worker.go index bebd3a75a..a7a262502 100644 --- a/extern/sector-storage/sched_worker.go +++ b/extern/sector-storage/sched_worker.go @@ -34,8 +34,8 @@ func newWorkerHandle(ctx context.Context, w Worker) (*workerHandle, error) { workerRpc: w, info: info, - preparing: &activeResources{}, - active: &activeResources{}, + preparing: newActiveResources(), + active: newActiveResources(), enabled: true, closingMgr: make(chan struct{}), @@ -292,14 +292,14 @@ func (sw *schedWorker) workerCompactWindows() { for ti, todo := range window.todo { needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType) - if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info) { + if !lower.allocated.canHandleRequest(todo.SealTask(), needRes, sw.wid, "compactWindows", worker.info) { continue } moved = append(moved, ti) lower.todo = append(lower.todo, todo) - lower.allocated.add(worker.info.Resources, needRes) - window.allocated.free(worker.info.Resources, needRes) + lower.allocated.add(todo.SealTask(), worker.info.Resources, needRes) + window.allocated.free(todo.SealTask(), worker.info.Resources, needRes) } if len(moved) > 0 { @@ -353,7 +353,7 @@ assignLoop: worker.lk.Lock() for t, todo := range firstWindow.todo { needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType) - if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) { + if worker.preparing.canHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.info) { tidx = t break } @@ -414,7 +414,7 @@ assignLoop: } needRes := storiface.ResourceTable[todo.taskType][todo.sector.ProofType] - if worker.active.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) { + if worker.active.canHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.info) { tidx = t break } @@ -454,7 +454,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType) w.lk.Lock() - w.preparing.add(w.info.Resources, needRes) + w.preparing.add(req.SealTask(), w.info.Resources, needRes) w.lk.Unlock() go func() { @@ -465,7 +465,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { w.lk.Lock() if err != nil { - w.preparing.free(w.info.Resources, needRes) + w.preparing.free(req.SealTask(), w.info.Resources, needRes) w.lk.Unlock() select { @@ -494,8 +494,8 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { }() // wait (if needed) for resources in the 'active' window - err = w.active.withResources(sw.wid, w.info, needRes, &w.lk, func() error { - w.preparing.free(w.info.Resources, needRes) + err = w.active.withResources(sw.wid, w.info, req.SealTask(), needRes, &w.lk, func() error { + w.preparing.free(req.SealTask(), w.info.Resources, needRes) w.lk.Unlock() defer w.lk.Lock() // we MUST return locked from this function @@ -526,7 +526,6 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { if err != nil { log.Errorf("error executing worker (withResources): %+v", err) } - sh.TaskReduce(req.taskType, sw.wid) }() return nil @@ -537,7 +536,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error { needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType) - w.active.add(w.info.Resources, needRes) + w.active.add(req.SealTask(), w.info.Resources, needRes) go func() { // Do the work! @@ -555,8 +554,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error { w.lk.Lock() - w.active.free(w.info.Resources, needRes) - sh.TaskReduce(req.taskType, sw.wid) + w.active.free(req.SealTask(), w.info.Resources, needRes) select { case sw.taskDone <- struct{}{}: diff --git a/extern/sector-storage/sealtasks/task.go b/extern/sector-storage/sealtasks/task.go index e8a156291..fbf72ba29 100644 --- a/extern/sector-storage/sealtasks/task.go +++ b/extern/sector-storage/sealtasks/task.go @@ -1,5 +1,14 @@ package sealtasks +import ( + "fmt" + "golang.org/x/xerrors" + "strconv" + "strings" + + "github.com/filecoin-project/go-state-types/abi" +) + type TaskType string const ( @@ -104,3 +113,37 @@ func (a TaskType) Short() string { return n } + +type SealTaskType struct { + TaskType + abi.RegisteredSealProof +} + +func (a TaskType) SealTask(spt abi.RegisteredSealProof) SealTaskType { + return SealTaskType{ + TaskType: a, + RegisteredSealProof: spt, + } +} + +func SttFromString(s string) (SealTaskType, error) { + var res SealTaskType + + sub := strings.SplitN(s, ":", 2) + if len(sub) != 2 { + return res, xerrors.Errorf("seal task type string invalid") + } + + res.TaskType = TaskType(sub[1]) + spt, err := strconv.ParseInt(sub[0], 10, 64) + if err != nil { + return SealTaskType{}, err + } + res.RegisteredSealProof = abi.RegisteredSealProof(spt) + + return res, nil +} + +func (a SealTaskType) String() string { + return fmt.Sprintf("%d:%s", a.RegisteredSealProof, a.TaskType) +} diff --git a/extern/sector-storage/stats.go b/extern/sector-storage/stats.go index 9b374f328..cd97e7002 100644 --- a/extern/sector-storage/stats.go +++ b/extern/sector-storage/stats.go @@ -39,7 +39,14 @@ func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.Worke MemUsedMax: handle.active.memUsedMax, GpuUsed: handle.active.gpuUsed, CpuUse: handle.active.cpuUse, + + TaskCounts: map[string]int{}, } + + for tt, count := range handle.active.taskCounters { + out[uuid.UUID(id)].TaskCounts[tt.String()] = count + } + handle.lk.Unlock() } diff --git a/extern/sector-storage/storiface/resources.go b/extern/sector-storage/storiface/resources.go index 71fd9e30c..0e3294756 100644 --- a/extern/sector-storage/storiface/resources.go +++ b/extern/sector-storage/storiface/resources.go @@ -26,6 +26,8 @@ type Resources struct { MaxParallelismGPU int `envname:"MAX_PARALLELISM_GPU"` // when 0, inherits MaxParallelism BaseMinMemory uint64 `envname:"BASE_MIN_MEMORY"` // What Must be in RAM for decent perf (shared between threads) + + MaxConcurrent int `envname:"MAX_CONCURRENT"` // Maximum number of tasks of this type that can be scheduled on a worker (0=default, no limit) } /* diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index b1b89443d..0f3e76bf6 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "sync" "time" "github.com/google/uuid" @@ -32,14 +31,6 @@ type WorkerInfo struct { // Default should be false (zero value, i.e. resources taken into account). IgnoreResources bool Resources WorkerResources - - TaskLimits map[sealtasks.TaskType]*LimitConfig - TaskLimitLk sync.Mutex -} - -type LimitConfig struct { - LimitCount int - RunCount int } type WorkerResources struct { @@ -84,6 +75,8 @@ type WorkerStats struct { MemUsedMax uint64 GpuUsed float64 // nolint CpuUse uint64 // nolint + + TaskCounts map[string]int } const ( diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 982dc5140..9a14e42b5 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -7,7 +7,6 @@ import ( "os" "reflect" "runtime" - "strconv" "sync" "sync/atomic" "time" @@ -798,26 +797,9 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) { return storiface.WorkerInfo{}, xerrors.Errorf("interpreting resource env vars: %w", err) } - // parallel-p1-limit - p1Limit := -1 - if limit, ok := os.LookupEnv("PARALLEL_P1_LIMIT"); ok { - li, err := strconv.Atoi(limit) - if err != nil { - log.Errorf("failed to parse PARALLEL_P1_LIMIT env var, default=-1") - } else { - p1Limit = li - } - } - taskLimits := make(map[sealtasks.TaskType]*storiface.LimitConfig) - taskLimits[sealtasks.TTPreCommit1] = &storiface.LimitConfig{ - LimitCount: p1Limit, - RunCount: 0, - } - return storiface.WorkerInfo{ Hostname: hostname, IgnoreResources: l.ignoreResources, - TaskLimits: taskLimits, Resources: storiface.WorkerResources{ MemPhysical: memPhysical, MemUsed: memUsed,