feat: sched: Worker task count limits for all task types

This commit is contained in:
Łukasz Magiera 2022-05-25 14:44:11 +02:00
parent c4cfb7a296
commit 083c7421ce
13 changed files with 195 additions and 162 deletions

View File

@ -4,7 +4,6 @@ import (
"fmt"
_ "net/http/pprof"
"os"
"strconv"
"github.com/filecoin-project/lotus/api/v1api"
@ -50,11 +49,6 @@ var runCmd = &cli.Command{
Usage: "manage open file limit",
Value: true,
},
&cli.IntFlag{
Name: "parallel-p1-limit",
Usage: "maximum pre commit1 operations to run in parallel",
Value: -1,
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Bool("enable-gpu-proving") {
@ -64,8 +58,6 @@ var runCmd = &cli.Command{
}
}
os.Setenv("PARALLEL_P1_LIMIT", strconv.Itoa(cctx.Int("parallel-p1-limit")))
ctx, _ := tag.New(lcli.DaemonContext(cctx),
tag.Insert(metrics.Version, build.BuildVersion),
tag.Insert(metrics.Commit, build.CurrentCommit),

View File

@ -97,13 +97,20 @@ func workersCmd(sealing bool) *cli.Command {
return st[i].id.String() < st[j].id.String()
})
/*
Example output:
Worker c4d65451-07f8-4230-98ad-4f33dea2a8cc, host myhostname
TASK: PC1(1/4) AP(15/15) GET(3)
CPU: [|||||||| ] 16/128 core(s) in use
RAM: [|||||||| ] 12% 125.8 GiB/1008 GiB
VMEM: [|||||||| ] 12% 125.8 GiB/1008 GiB
GPU: [ ] 0% 0.00/1 gpu(s) in use
GPU: NVIDIA GeForce RTX 3090, not used
*/
for _, stat := range st {
gpuUse := "not "
gpuCol := color.FgBlue
if stat.GpuUsed > 0 {
gpuCol = color.FgGreen
gpuUse = ""
}
// Worker uuid + name
var disabled string
if !stat.Enabled {
@ -112,9 +119,53 @@ func workersCmd(sealing bool) *cli.Command {
fmt.Printf("Worker %s, host %s%s\n", stat.id, color.MagentaString(stat.Info.Hostname), disabled)
// Task counts
tc := make([][]string, 0, len(stat.TaskCounts))
for st, c := range stat.TaskCounts {
if c == 0 {
continue
}
stt, err := sealtasks.SttFromString(st)
if err != nil {
return err
}
str := fmt.Sprint(c)
if max := stat.Info.Resources.ResourceSpec(stt.RegisteredSealProof, stt.TaskType).MaxConcurrent; max > 0 {
switch {
case c < max:
str = color.GreenString(str)
case c >= max:
str = color.YellowString(str)
}
str = fmt.Sprintf("%s/%d", str, max)
} else {
str = color.CyanString(str)
}
str = fmt.Sprintf("%s(%s)", color.BlueString(stt.Short()), str)
tc = append(tc, []string{string(stt.TaskType), str})
}
sort.Slice(tc, func(i, j int) bool {
return sealtasks.TaskType(tc[i][0]).Less(sealtasks.TaskType(tc[j][0]))
})
var taskStr string
for _, t := range tc {
taskStr = t[1] + " "
}
if taskStr != "" {
fmt.Printf("\tTASK: %s\n", taskStr)
}
// CPU use
fmt.Printf("\tCPU: [%s] %d/%d core(s) in use\n",
barString(float64(stat.Info.Resources.CPUs), 0, float64(stat.CpuUse)), stat.CpuUse, stat.Info.Resources.CPUs)
// RAM use
ramTotal := stat.Info.Resources.MemPhysical
ramTasks := stat.MemUsedMin
ramUsed := stat.Info.Resources.MemUsed
@ -129,6 +180,8 @@ func workersCmd(sealing bool) *cli.Command {
types.SizeStr(types.NewInt(ramTasks+ramUsed)),
types.SizeStr(types.NewInt(stat.Info.Resources.MemPhysical)))
// VMEM use (ram+swap)
vmemTotal := stat.Info.Resources.MemPhysical + stat.Info.Resources.MemSwap
vmemTasks := stat.MemUsedMax
vmemUsed := stat.Info.Resources.MemUsed + stat.Info.Resources.MemSwapUsed
@ -143,21 +196,24 @@ func workersCmd(sealing bool) *cli.Command {
types.SizeStr(types.NewInt(vmemTasks+vmemReserved)),
types.SizeStr(types.NewInt(vmemTotal)))
// GPU use
if len(stat.Info.Resources.GPUs) > 0 {
gpuBar := barString(float64(len(stat.Info.Resources.GPUs)), 0, stat.GpuUsed)
fmt.Printf("\tGPU: [%s] %.f%% %.2f/%d gpu(s) in use\n", color.GreenString(gpuBar),
stat.GpuUsed*100/float64(len(stat.Info.Resources.GPUs)),
stat.GpuUsed, len(stat.Info.Resources.GPUs))
}
gpuUse := "not "
gpuCol := color.FgBlue
if stat.GpuUsed > 0 {
gpuCol = color.FgGreen
gpuUse = ""
}
for _, gpu := range stat.Info.Resources.GPUs {
fmt.Printf("\tGPU: %s\n", color.New(gpuCol).Sprintf("%s, %sused", gpu, gpuUse))
}
plConfig, ok := stat.Info.TaskLimits[sealtasks.TTPreCommit1]
if ok && plConfig.LimitCount > 0 {
fmt.Printf("\tP1LIMIT: [%s] %d/%d tasks are running\n",
barString(float64(plConfig.LimitCount), 0, float64(plConfig.RunCount)), plConfig.RunCount, plConfig.LimitCount)
}
}
return nil

View File

@ -9,7 +9,6 @@ import (
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
@ -209,11 +208,6 @@ var runCmd = &cli.Command{
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
Value: "30m",
},
&cli.IntFlag{
Name: "parallel-p1-limit",
Usage: "maximum precommit1 operations to run in parallel",
Value: -1,
},
},
Before: func(cctx *cli.Context) error {
if cctx.IsSet("address") {
@ -234,8 +228,6 @@ var runCmd = &cli.Command{
}
}
os.Setenv("PARALLEL_P1_LIMIT", strconv.Itoa(cctx.Int("parallel-p1-limit")))
limit, _, err := ulimit.GetLimit()
switch {
case err == ulimit.ErrUnsupported:

View File

@ -116,16 +116,6 @@ type workerDisableReq struct {
done func()
}
type activeResources struct {
memUsedMin uint64
memUsedMax uint64
gpuUsed float64
cpuUse uint64
cond *sync.Cond
waiting int
}
type workerRequest struct {
sector storage.SectorRef
taskType sealtasks.TaskType
@ -214,6 +204,13 @@ func (r *workerRequest) respond(err error) {
}
}
func (r *workerRequest) SealTask() sealtasks.SealTaskType {
return sealtasks.SealTaskType{
TaskType: r.taskType,
RegisteredSealProof: r.sector.ProofType,
}
}
type SchedDiagRequestInfo struct {
Sector abi.SectorID
TaskType sealtasks.TaskType
@ -366,6 +363,9 @@ func (sh *scheduler) trySched() {
}
windows := make([]schedWindow, windowsLen)
for i := range windows {
windows[i].allocated = *newActiveResources()
}
acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex
// Step 1
@ -401,7 +401,7 @@ func (sh *scheduler) trySched() {
needRes := worker.info.Resources.ResourceSpec(task.sector.ProofType, task.taskType)
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info) {
if !windows[wnd].allocated.canHandleRequest(task.SealTask(), needRes, windowRequest.worker, "schedAcceptable", worker.info) {
continue
}
@ -475,16 +475,12 @@ func (sh *scheduler) trySched() {
wid := sh.openWindows[wnd].worker
w := sh.workers[wid]
res := info.Resources.ResourceSpec(task.sector.ProofType, task.taskType)
res := w.info.Resources.ResourceSpec(task.sector.ProofType, task.taskType)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.sector.ID.Number, wnd, i)
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", info) {
continue
}
if !sh.CanHandleTask(task.taskType, wid) {
if !windows[wnd].allocated.canHandleRequest(task.SealTask(), res, wid, "schedAssign", w.info) {
continue
}
@ -507,7 +503,6 @@ func (sh *scheduler) trySched() {
// #--------> acceptableWindow index
//
// * -> we're here
sh.TaskAdd(task.taskType, bestWid)
break
}
@ -531,7 +526,7 @@ func (sh *scheduler) trySched() {
"worker", bestWid,
"utilization", bestUtilization)
workerUtil[bestWid] += windows[selectedWindow].allocated.add(info.Resources, needRes)
workerUtil[bestWid] += windows[selectedWindow].allocated.add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].todo = append(windows[selectedWindow].todo, task)
rmQueue = append(rmQueue, sqi)
@ -615,55 +610,3 @@ func (sh *scheduler) Close(ctx context.Context) error {
}
return nil
}
func (sh *scheduler) CanHandleTask(taskType sealtasks.TaskType, wid storiface.WorkerID) (flag bool) {
if wh, ok := sh.workers[wid]; ok {
wh.info.TaskLimitLk.Lock()
defer wh.info.TaskLimitLk.Unlock()
taskLimit, ok := wh.info.TaskLimits[taskType]
if !ok {
flag = true
return
}
log.Debugf("CanHandleTask: %v:%v", taskLimit.LimitCount, taskLimit.RunCount)
if taskLimit.LimitCount > 0 {
freeCount := taskLimit.LimitCount - taskLimit.RunCount
if freeCount > 0 {
flag = true
}
} else {
flag = true
}
} else {
flag = true
}
return
}
func (sh *scheduler) TaskAdd(taskType sealtasks.TaskType, wid storiface.WorkerID) {
log.Debugf("begin task add:%v-%v", wid, taskType)
if wh, ok := sh.workers[wid]; ok {
wh.info.TaskLimitLk.Lock()
defer wh.info.TaskLimitLk.Unlock()
taskLimit, ok := wh.info.TaskLimits[taskType]
if ok {
log.Debugf("task limit:%v-%v", taskLimit.LimitCount, taskLimit.RunCount)
taskLimit.RunCount++
}
}
}
func (sh *scheduler) TaskReduce(taskType sealtasks.TaskType, wid storiface.WorkerID) {
log.Debugf("begin task reduce:%v-%v", wid, taskType)
if wh, ok := sh.workers[wid]; ok {
wh.info.TaskLimitLk.Lock()
defer wh.info.TaskLimitLk.Unlock()
taskLimit, ok := wh.info.TaskLimits[taskType]
if ok {
log.Debugf("task limit:%v-%v", taskLimit.LimitCount, taskLimit.RunCount)
taskLimit.RunCount--
}
}
}

View File

@ -105,7 +105,7 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg
selected := candidates[0]
worker := ps.workers[selected.id]
return worker.active.withResources(selected.id, worker.info, selected.res, &ps.lk, func() error {
return worker.active.withResources(selected.id, worker.info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()
@ -124,7 +124,7 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand
for wid, wr := range ps.workers {
needRes := wr.info.Resources.ResourceSpec(spt, ps.postType)
if !wr.active.canHandleRequest(needRes, wid, "post-readyWorkers", wr.info) {
if !wr.active.canHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.info) {
continue
}

View File

@ -9,8 +9,26 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.canHandleRequest(r, id, "withResources", wr) {
type activeResources struct {
memUsedMin uint64
memUsedMax uint64
gpuUsed float64
cpuUse uint64
taskCounters map[sealtasks.SealTaskType]int
cond *sync.Cond
waiting int
}
func newActiveResources() *activeResources {
return &activeResources{
taskCounters: map[sealtasks.SealTaskType]int{},
}
}
func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.canHandleRequest(tt, r, id, "withResources", wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
}
@ -19,11 +37,11 @@ func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.Work
a.waiting--
}
a.add(wr.Resources, r)
a.add(tt, wr.Resources, r)
err := cb()
a.free(wr.Resources, r)
a.free(tt, wr.Resources, r)
return err
}
@ -34,7 +52,7 @@ func (a *activeResources) hasWorkWaiting() bool {
}
// add task resources to activeResources and return utilization difference
func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resources) float64 {
func (a *activeResources) add(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 {
startUtil := a.utilization(wr)
if r.GPUUtilization > 0 {
@ -43,17 +61,21 @@ func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resource
a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory
t := a.taskCounters[tt]
t++
a.taskCounters[tt] = t
return a.utilization(wr) - startUtil
}
func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resources) {
func (a *activeResources) free(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) {
if r.GPUUtilization > 0 {
a.gpuUsed -= r.GPUUtilization
}
a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
a.taskCounters[tt]--
if a.cond != nil {
a.cond.Broadcast()
@ -62,7 +84,14 @@ func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resourc
// canHandleRequest evaluates if the worker has enough available resources to
// handle the request.
func (a *activeResources) canHandleRequest(needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
func (a *activeResources) canHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
if needRes.MaxConcurrent > 0 {
if a.taskCounters[tt] >= needRes.MaxConcurrent {
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters[tt])
return false
}
}
if info.IgnoreResources {
// shortcircuit; if this worker is ignoring resources, it can always handle the request.
return true
@ -110,7 +139,7 @@ func (a *activeResources) canHandleRequest(needRes storiface.Resources, wid stor
}
// utilization returns a number in 0..1 range indicating fraction of used resources
func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { // todo task type
var max float64
cpu := float64(a.cpuUse) / float64(wr.CPUs)

View File

@ -161,15 +161,9 @@ func (s *schedTestWorker) Paths(ctx context.Context) ([]storiface.StoragePath, e
}
func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) {
taskLimits := make(map[sealtasks.TaskType]*storiface.LimitConfig)
taskLimits[sealtasks.TTPreCommit1] = &storiface.LimitConfig{
LimitCount: 6,
RunCount: 0,
}
return storiface.WorkerInfo{
Hostname: s.name,
IgnoreResources: s.ignoreResources,
TaskLimits: taskLimits,
Resources: s.resources,
}, nil
}
@ -617,8 +611,8 @@ func BenchmarkTrySched(b *testing.B) {
Hostname: "t",
Resources: decentWorkerResources,
},
preparing: &activeResources{},
active: &activeResources{},
preparing: newActiveResources(),
active: newActiveResources(),
}
for i := 0; i < windows; i++ {
@ -662,14 +656,16 @@ func TestWindowCompact(t *testing.T) {
}
for _, windowTasks := range start {
window := &schedWindow{}
window := &schedWindow{
allocated: *newActiveResources(),
}
for _, task := range windowTasks {
window.todo = append(window.todo, &workerRequest{
taskType: task,
sector: storage.SectorRef{ProofType: spt},
})
window.allocated.add(wh.info.Resources, storiface.ResourceTable[task][spt])
window.allocated.add(task.SealTask(spt), wh.info.Resources, storiface.ResourceTable[task][spt])
}
wh.activeWindows = append(wh.activeWindows, window)
@ -688,7 +684,7 @@ func TestWindowCompact(t *testing.T) {
for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].todo[ti].taskType, "%d, %d", wi, ti)
expectRes.add(wh.info.Resources, storiface.ResourceTable[task][spt])
expectRes.add(task.SealTask(spt), wh.info.Resources, storiface.ResourceTable[task][spt])
}
require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].allocated.cpuUse, "%d", wi)

View File

@ -34,8 +34,8 @@ func newWorkerHandle(ctx context.Context, w Worker) (*workerHandle, error) {
workerRpc: w,
info: info,
preparing: &activeResources{},
active: &activeResources{},
preparing: newActiveResources(),
active: newActiveResources(),
enabled: true,
closingMgr: make(chan struct{}),
@ -292,14 +292,14 @@ func (sw *schedWorker) workerCompactWindows() {
for ti, todo := range window.todo {
needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType)
if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info) {
if !lower.allocated.canHandleRequest(todo.SealTask(), needRes, sw.wid, "compactWindows", worker.info) {
continue
}
moved = append(moved, ti)
lower.todo = append(lower.todo, todo)
lower.allocated.add(worker.info.Resources, needRes)
window.allocated.free(worker.info.Resources, needRes)
lower.allocated.add(todo.SealTask(), worker.info.Resources, needRes)
window.allocated.free(todo.SealTask(), worker.info.Resources, needRes)
}
if len(moved) > 0 {
@ -353,7 +353,7 @@ assignLoop:
worker.lk.Lock()
for t, todo := range firstWindow.todo {
needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType)
if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) {
if worker.preparing.canHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.info) {
tidx = t
break
}
@ -414,7 +414,7 @@ assignLoop:
}
needRes := storiface.ResourceTable[todo.taskType][todo.sector.ProofType]
if worker.active.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) {
if worker.active.canHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.info) {
tidx = t
break
}
@ -454,7 +454,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType)
w.lk.Lock()
w.preparing.add(w.info.Resources, needRes)
w.preparing.add(req.SealTask(), w.info.Resources, needRes)
w.lk.Unlock()
go func() {
@ -465,7 +465,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
w.lk.Lock()
if err != nil {
w.preparing.free(w.info.Resources, needRes)
w.preparing.free(req.SealTask(), w.info.Resources, needRes)
w.lk.Unlock()
select {
@ -494,8 +494,8 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
}()
// wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.info, needRes, &w.lk, func() error {
w.preparing.free(w.info.Resources, needRes)
err = w.active.withResources(sw.wid, w.info, req.SealTask(), needRes, &w.lk, func() error {
w.preparing.free(req.SealTask(), w.info.Resources, needRes)
w.lk.Unlock()
defer w.lk.Lock() // we MUST return locked from this function
@ -526,7 +526,6 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
if err != nil {
log.Errorf("error executing worker (withResources): %+v", err)
}
sh.TaskReduce(req.taskType, sw.wid)
}()
return nil
@ -537,7 +536,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType)
w.active.add(w.info.Resources, needRes)
w.active.add(req.SealTask(), w.info.Resources, needRes)
go func() {
// Do the work!
@ -555,8 +554,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
w.lk.Lock()
w.active.free(w.info.Resources, needRes)
sh.TaskReduce(req.taskType, sw.wid)
w.active.free(req.SealTask(), w.info.Resources, needRes)
select {
case sw.taskDone <- struct{}{}:

View File

@ -1,5 +1,14 @@
package sealtasks
import (
"fmt"
"golang.org/x/xerrors"
"strconv"
"strings"
"github.com/filecoin-project/go-state-types/abi"
)
type TaskType string
const (
@ -104,3 +113,37 @@ func (a TaskType) Short() string {
return n
}
type SealTaskType struct {
TaskType
abi.RegisteredSealProof
}
func (a TaskType) SealTask(spt abi.RegisteredSealProof) SealTaskType {
return SealTaskType{
TaskType: a,
RegisteredSealProof: spt,
}
}
func SttFromString(s string) (SealTaskType, error) {
var res SealTaskType
sub := strings.SplitN(s, ":", 2)
if len(sub) != 2 {
return res, xerrors.Errorf("seal task type string invalid")
}
res.TaskType = TaskType(sub[1])
spt, err := strconv.ParseInt(sub[0], 10, 64)
if err != nil {
return SealTaskType{}, err
}
res.RegisteredSealProof = abi.RegisteredSealProof(spt)
return res, nil
}
func (a SealTaskType) String() string {
return fmt.Sprintf("%d:%s", a.RegisteredSealProof, a.TaskType)
}

View File

@ -39,7 +39,14 @@ func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.Worke
MemUsedMax: handle.active.memUsedMax,
GpuUsed: handle.active.gpuUsed,
CpuUse: handle.active.cpuUse,
TaskCounts: map[string]int{},
}
for tt, count := range handle.active.taskCounters {
out[uuid.UUID(id)].TaskCounts[tt.String()] = count
}
handle.lk.Unlock()
}

View File

@ -26,6 +26,8 @@ type Resources struct {
MaxParallelismGPU int `envname:"MAX_PARALLELISM_GPU"` // when 0, inherits MaxParallelism
BaseMinMemory uint64 `envname:"BASE_MIN_MEMORY"` // What Must be in RAM for decent perf (shared between threads)
MaxConcurrent int `envname:"MAX_CONCURRENT"` // Maximum number of tasks of this type that can be scheduled on a worker (0=default, no limit)
}
/*

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/google/uuid"
@ -32,14 +31,6 @@ type WorkerInfo struct {
// Default should be false (zero value, i.e. resources taken into account).
IgnoreResources bool
Resources WorkerResources
TaskLimits map[sealtasks.TaskType]*LimitConfig
TaskLimitLk sync.Mutex
}
type LimitConfig struct {
LimitCount int
RunCount int
}
type WorkerResources struct {
@ -84,6 +75,8 @@ type WorkerStats struct {
MemUsedMax uint64
GpuUsed float64 // nolint
CpuUse uint64 // nolint
TaskCounts map[string]int
}
const (

View File

@ -7,7 +7,6 @@ import (
"os"
"reflect"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
@ -798,26 +797,9 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
return storiface.WorkerInfo{}, xerrors.Errorf("interpreting resource env vars: %w", err)
}
// parallel-p1-limit
p1Limit := -1
if limit, ok := os.LookupEnv("PARALLEL_P1_LIMIT"); ok {
li, err := strconv.Atoi(limit)
if err != nil {
log.Errorf("failed to parse PARALLEL_P1_LIMIT env var, default=-1")
} else {
p1Limit = li
}
}
taskLimits := make(map[sealtasks.TaskType]*storiface.LimitConfig)
taskLimits[sealtasks.TTPreCommit1] = &storiface.LimitConfig{
LimitCount: p1Limit,
RunCount: 0,
}
return storiface.WorkerInfo{
Hostname: hostname,
IgnoreResources: l.ignoreResources,
TaskLimits: taskLimits,
Resources: storiface.WorkerResources{
MemPhysical: memPhysical,
MemUsed: memUsed,