From c9a2ff4007de10d03b6905c241dfaae403c4afe5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 29 Nov 2021 14:42:20 +0100 Subject: [PATCH] cleanup worker resource overrides --- api/version.go | 2 +- extern/sector-storage/manager.go | 5 - extern/sector-storage/sched.go | 14 +- extern/sector-storage/sched_resources.go | 8 +- extern/sector-storage/sched_test.go | 4 +- extern/sector-storage/sched_worker.go | 24 ++- .../{ => storiface}/resources.go | 154 +++++++++++------- .../storiface/resources_test.go | 75 +++++++++ extern/sector-storage/storiface/worker.go | 30 +++- extern/sector-storage/worker_local.go | 31 +--- extern/sector-storage/worker_tracked.go | 8 +- 11 files changed, 232 insertions(+), 123 deletions(-) rename extern/sector-storage/{ => storiface}/resources.go (71%) create mode 100644 extern/sector-storage/storiface/resources_test.go diff --git a/api/version.go b/api/version.go index ff1115e1d..93148f28d 100644 --- a/api/version.go +++ b/api/version.go @@ -58,7 +58,7 @@ var ( FullAPIVersion1 = newVer(2, 1, 0) MinerAPIVersion0 = newVer(1, 2, 0) - WorkerAPIVersion0 = newVer(1, 4, 0) + WorkerAPIVersion0 = newVer(1, 5, 0) ) //nolint:varcheck,deadcode diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 430313730..fb081ee5d 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -51,13 +51,8 @@ type SectorManager interface { FaultTracker } -type WorkerID uuid.UUID // worker session UUID var ClosedWorkerID = uuid.UUID{} -func (w WorkerID) String() string { - return uuid.UUID(w).String() -} - type Manager struct { ls stores.LocalStorage storage *stores.Remote diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index e6e3121f3..d7d7d3265 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -53,7 +53,7 @@ type WorkerSelector interface { type scheduler struct { workersLk sync.RWMutex - workers map[WorkerID]*workerHandle + workers map[storiface.WorkerID]*workerHandle schedule chan *workerRequest windowRequests chan *schedWindowRequest @@ -95,7 +95,7 @@ type workerHandle struct { } type schedWindowRequest struct { - worker WorkerID + worker storiface.WorkerID done chan *schedWindow } @@ -107,7 +107,7 @@ type schedWindow struct { type workerDisableReq struct { activeWindows []*schedWindow - wid WorkerID + wid storiface.WorkerID done func() } @@ -145,7 +145,7 @@ type workerResponse struct { func newScheduler() *scheduler { return &scheduler{ - workers: map[WorkerID]*workerHandle{}, + workers: map[storiface.WorkerID]*workerHandle{}, schedule: make(chan *workerRequest), windowRequests: make(chan *schedWindowRequest, 20), @@ -378,7 +378,6 @@ func (sh *scheduler) trySched() { }() task := (*sh.schedQueue)[sqi] - needRes := ResourceTable[task.taskType][task.sector.ProofType] task.indexHeap = sqi for wnd, windowRequest := range sh.openWindows { @@ -394,6 +393,8 @@ func (sh *scheduler) trySched() { continue } + needRes := worker.info.Resources.ResourceSpec(task.sector.ProofType, task.taskType) + // TODO: allow bigger windows if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info) { continue @@ -457,7 +458,6 @@ func (sh *scheduler) trySched() { for sqi := 0; sqi < queueLen; sqi++ { task := (*sh.schedQueue)[sqi] - needRes := ResourceTable[task.taskType][task.sector.ProofType] selectedWindow := -1 for _, wnd := range acceptableWindows[task.indexHeap] { @@ -466,6 +466,8 @@ func (sh *scheduler) trySched() { log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.ID.Number, wnd) + needRes := info.Resources.ResourceSpec(task.sector.ProofType, task.taskType) + // TODO: allow bigger windows if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", info) { continue diff --git a/extern/sector-storage/sched_resources.go b/extern/sector-storage/sched_resources.go index cbd8fb625..6e5d70508 100644 --- a/extern/sector-storage/sched_resources.go +++ b/extern/sector-storage/sched_resources.go @@ -6,7 +6,7 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) -func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerInfo, r Resources, locker sync.Locker, cb func() error) error { +func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, r storiface.Resources, locker sync.Locker, cb func() error) error { for !a.canHandleRequest(r, id, "withResources", wr) { if a.cond == nil { a.cond = sync.NewCond(locker) @@ -30,7 +30,7 @@ func (a *activeResources) hasWorkWaiting() bool { return a.waiting > 0 } -func (a *activeResources) add(wr storiface.WorkerResources, r Resources) { +func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resources) { if r.GPUUtilization > 0 { a.gpuUsed += r.GPUUtilization } @@ -39,7 +39,7 @@ func (a *activeResources) add(wr storiface.WorkerResources, r Resources) { a.memUsedMax += r.MaxMemory } -func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { +func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resources) { if r.GPUUtilization > 0 { a.gpuUsed -= r.GPUUtilization } @@ -54,7 +54,7 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { // canHandleRequest evaluates if the worker has enough available resources to // handle the request. -func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, info storiface.WorkerInfo) bool { +func (a *activeResources) canHandleRequest(needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool { if info.IgnoreResources { // shortcircuit; if this worker is ignoring resources, it can always handle the request. return true diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index b98c93031..f64ed57e2 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -560,7 +560,7 @@ func BenchmarkTrySched(b *testing.B) { b.StopTimer() sched := newScheduler() - sched.workers[WorkerID{}] = &workerHandle{ + sched.workers[storiface.WorkerID{}] = &workerHandle{ workerRpc: nil, info: storiface.WorkerInfo{ Hostname: "t", @@ -572,7 +572,7 @@ func BenchmarkTrySched(b *testing.B) { for i := 0; i < windows; i++ { sched.openWindows = append(sched.openWindows, &schedWindowRequest{ - worker: WorkerID{}, + worker: storiface.WorkerID{}, done: make(chan *schedWindow, 1000), }) } diff --git a/extern/sector-storage/sched_worker.go b/extern/sector-storage/sched_worker.go index bb6ba627b..762c3fc3a 100644 --- a/extern/sector-storage/sched_worker.go +++ b/extern/sector-storage/sched_worker.go @@ -4,17 +4,18 @@ import ( "context" "time" - "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/stores" + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) type schedWorker struct { sched *scheduler worker *workerHandle - wid WorkerID + wid storiface.WorkerID heartbeatTimer *time.Ticker scheduledWindows chan *schedWindow @@ -50,7 +51,7 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error { closedMgr: make(chan struct{}), } - wid := WorkerID(sessID) + wid := storiface.WorkerID(sessID) sh.workersLk.Lock() _, exist := sh.workers[wid] @@ -237,7 +238,7 @@ func (sw *schedWorker) checkSession(ctx context.Context) bool { continue } - if WorkerID(curSes) != sw.wid { + if storiface.WorkerID(curSes) != sw.wid { if curSes != ClosedWorkerID { // worker restarted log.Warnw("worker session changed (worker restarted?)", "initial", sw.wid, "current", curSes) @@ -296,8 +297,7 @@ func (sw *schedWorker) workerCompactWindows() { var moved []int for ti, todo := range window.todo { - needRes := ResourceTable[todo.taskType][todo.sector.ProofType] - needRes.customizeForWorker(todo.taskType.Short(), sw.wid, worker.info) + needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType) if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info) { continue } @@ -358,8 +358,7 @@ assignLoop: worker.lk.Lock() for t, todo := range firstWindow.todo { - needRes := ResourceTable[todo.taskType][todo.sector.ProofType] - needRes.customizeForWorker(todo.taskType.Short(), sw.wid, worker.info) + needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType) if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) { tidx = t break @@ -420,7 +419,7 @@ assignLoop: continue } - needRes := ResourceTable[todo.taskType][todo.sector.ProofType] + needRes := storiface.ResourceTable[todo.taskType][todo.sector.ProofType] if worker.active.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) { tidx = t break @@ -458,8 +457,7 @@ assignLoop: func (sw *schedWorker) startProcessingTask(req *workerRequest) error { w, sh := sw.worker, sw.sched - needRes := ResourceTable[req.taskType][req.sector.ProofType] - needRes.customizeForWorker(req.taskType.Short(), sw.wid, w.info) + needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType) w.lk.Lock() w.preparing.add(w.info.Resources, needRes) @@ -542,7 +540,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error { w, sh := sw.worker, sw.sched - needRes := ResourceTable[req.taskType][req.sector.ProofType] + needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType) w.active.add(w.info.Resources, needRes) @@ -582,7 +580,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error { return nil } -func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) { +func (sh *scheduler) workerCleanup(wid storiface.WorkerID, w *workerHandle) { select { case <-w.closingMgr: default: diff --git a/extern/sector-storage/resources.go b/extern/sector-storage/storiface/resources.go similarity index 71% rename from extern/sector-storage/resources.go rename to extern/sector-storage/storiface/resources.go index e8e3e60cb..d634927ed 100644 --- a/extern/sector-storage/resources.go +++ b/extern/sector-storage/storiface/resources.go @@ -1,28 +1,31 @@ -package sectorstorage +package storiface import ( + "fmt" + "reflect" "strconv" + "strings" + + "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" - "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) type Resources struct { - MinMemory uint64 // What Must be in RAM for decent perf - MaxMemory uint64 // Memory required (swap + ram) + MinMemory uint64 `envname:"MIN_MEMORY"` // What Must be in RAM for decent perf + MaxMemory uint64 `envname:"MAX_MEMORY"` // Memory required (swap + ram) // GPUUtilization specifes the number of GPUs a task can use - GPUUtilization float64 + GPUUtilization float64 `envname:"GPU_UTILIZATION"` // MaxParallelism specifies the number of CPU cores when GPU is NOT in use - MaxParallelism int // -1 = multithread + MaxParallelism int `envname:"MAX_PARALLELISM"` // -1 = multithread // MaxParallelismGPU specifies the number of CPU cores when GPU is in use - MaxParallelismGPU int // when 0, inherits MaxParallelism + MaxParallelismGPU int `envname:"MAX_PARALLELISM_GPU"` // when 0, inherits MaxParallelism - BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads) + BaseMinMemory uint64 `envname:"BASE_MIN_MEMORY"` // What Must be in RAM for decent perf (shared between threads) } /* @@ -59,59 +62,6 @@ func (r Resources) Threads(wcpus uint64, gpus int) uint64 { return uint64(mp) } -func (r *Resources) customizeForWorker(taskShortName string, wid WorkerID, info storiface.WorkerInfo) { - // update needed resources with worker options - if o, ok := info.Resources.ResourceOpts[taskShortName+"_MAX_MEMORY"]; ok { - i, err := strconv.ParseUint(o, 10, 64) - if err != nil { - log.Errorf("unable to parse %s_MAX_MEMORY value %s: %e", taskShortName, o, err) - } else { - r.MaxMemory = i - } - } - if o, ok := info.Resources.ResourceOpts[taskShortName+"_MIN_MEMORY"]; ok { - i, err := strconv.ParseUint(o, 10, 64) - if err != nil { - log.Errorf("unable to parse %s_MIN_MEMORY value %s: %e", taskShortName, o, err) - } else { - r.MinMemory = i - } - } - if o, ok := info.Resources.ResourceOpts[taskShortName+"_BASE_MIN_MEMORY"]; ok { - i, err := strconv.ParseUint(o, 10, 64) - if err != nil { - log.Errorf("unable to parse %s_BASE_MIN_MEMORY value %s: %e", taskShortName, o, err) - } else { - r.BaseMinMemory = i - } - } - if o, ok := info.Resources.ResourceOpts[taskShortName+"_MAX_PARALLELISM"]; ok { - i, err := strconv.Atoi(o) - if err != nil { - log.Errorf("unable to parse %s_MAX_PARALLELISM value %s: %e", taskShortName, o, err) - } else { - r.MaxParallelism = i - } - } - if o, ok := info.Resources.ResourceOpts[taskShortName+"_MAX_PARALLELISM_GPU"]; ok { - i, err := strconv.Atoi(o) - if err != nil { - log.Errorf("unable to parse %s_GPU_PARALLELISM value %s: %e", taskShortName, o, err) - } else { - r.MaxParallelismGPU = i - } - } - if o, ok := info.Resources.ResourceOpts[taskShortName+"_GPU_UTILIZATION"]; ok { - i, err := strconv.ParseFloat(o, 64) - if err != nil { - log.Errorf("unable to parse %s_GPU_UTILIZATION value %s: %e", taskShortName, o, err) - } else { - r.GPUUtilization = i - } - } - log.Debugf("resources required for %s on %s(%s): %+v", taskShortName, wid, info.Hostname, r) -} - var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{ sealtasks.TTAddPiece: { abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ @@ -395,3 +345,83 @@ func init() { m[abi.RegisteredSealProof_StackedDrg64GiBV1_1] = m[abi.RegisteredSealProof_StackedDrg64GiBV1] } } + +func ParseResources(lookup func(key, def string) (string, bool)) (map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources, error) { + out := map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{} + + for taskType, defTT := range ResourceTable { + out[taskType] = map[abi.RegisteredSealProof]Resources{} + + for spt, defRes := range defTT { + r := defRes // copy + + spsz, err := spt.SectorSize() + if err != nil { + return nil, xerrors.Errorf("getting sector size: %w", err) + } + shortSize := strings.TrimSuffix(spsz.ShortString(), "iB") + + rr := reflect.ValueOf(&r) + for i := 0; i < rr.Elem().Type().NumField(); i++ { + f := rr.Elem().Type().Field(i) + + envname := f.Tag.Get("envname") + if envname == "" { + return nil, xerrors.Errorf("no envname for field '%s'", f.Name) + } + + envval, found := lookup(taskType.Short() + "_" + shortSize + "_" + envname, fmt.Sprint(rr.Elem().Field(i).Interface())) + if !found { + // special multicore SDR handling + if (taskType == sealtasks.TTPreCommit1 || taskType == sealtasks.TTUnseal) && envname == "MAX_PARALLELISM" { + v, ok := rr.Elem().Field(i).Addr().Interface().(*int) + if !ok { + // can't happen, but let's not panic + return nil, xerrors.Errorf("res.MAX_PARALLELISM is not int (!?): %w", err) + } + *v, err = getSDRThreads(lookup) + if err != nil { + return nil, err + } + } + + continue + } + + v := rr.Elem().Field(i).Addr().Interface() + switch fv := v.(type) { + case *uint64: + *fv, err = strconv.ParseUint(envval, 10, 64) + case *int: + *fv, err = strconv.Atoi(envval) + case *float64: + *fv, err = strconv.ParseFloat(envval, 64) + default: + return nil, xerrors.Errorf("unknown resource field type") + } + } + + out[taskType][spt] = r + } + } + + return out, nil +} + +func getSDRThreads(lookup func(key, def string) (string, bool)) (_ int, err error) { + producers := 0 + + if v, _ := lookup("FIL_PROOFS_USE_MULTICORE_SDR", ""); v == "1" { + producers = 3 + + if penv, found := lookup("FIL_PROOFS_MULTICORE_SDR_PRODUCERS", ""); found { + producers, err = strconv.Atoi(penv) + if err != nil { + return 0, xerrors.Errorf("parsing (atoi) FIL_PROOFS_MULTICORE_SDR_PRODUCERS: %w", err) + } + } + } + + // producers + the one core actually doing the work + return producers+1, nil +} diff --git a/extern/sector-storage/storiface/resources_test.go b/extern/sector-storage/storiface/resources_test.go new file mode 100644 index 000000000..f58f46e23 --- /dev/null +++ b/extern/sector-storage/storiface/resources_test.go @@ -0,0 +1,75 @@ +package storiface + +import ( + "fmt" + "testing" + + stabi "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" + "github.com/stretchr/testify/require" +) + +func TestListResourceVars(t *testing.T) { + _, err := ParseResources(func(key, def string) (string, bool) { + if def != "" { + fmt.Printf("%s=%s\n", key, def) + } + + return "", false + }) + + require.NoError(t, err) +} + +func TestListResourceOverride(t *testing.T) { + rt, err := ParseResources(func(key, def string) (string, bool) { + if key == "UNS_2K_MAX_PARALLELISM" { + return "2", true + } + if key == "PC2_2K_GPU_UTILIZATION" { + return "0.4", true + } + if key == "PC2_2K_MAX_MEMORY" { + return "2222", true + } + + return "", false + }) + + require.NoError(t, err) + require.Equal(t, 2, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism) + require.Equal(t, 0.4, rt[sealtasks.TTPreCommit2][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].GPUUtilization) + require.Equal(t, uint64(2222), rt[sealtasks.TTPreCommit2][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxMemory) + + // check that defaults don't get mutated + require.Equal(t, 1, ResourceTable[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism) +} + +func TestListResourceSDRMulticoreOverride(t *testing.T) { + rt, err := ParseResources(func(key, def string) (string, bool) { + if key == "FIL_PROOFS_USE_MULTICORE_SDR" { + return "1", true + } + + return "", false + }) + + require.NoError(t, err) + require.Equal(t, 4, rt[sealtasks.TTPreCommit1][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism) + require.Equal(t, 4, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism) + + rt, err = ParseResources(func(key, def string) (string, bool) { + if key == "FIL_PROOFS_USE_MULTICORE_SDR" { + return "1", true + } + if key == "FIL_PROOFS_MULTICORE_SDR_PRODUCERS" { + return "9000", true + } + + return "", false + }) + + require.NoError(t, err) + require.Equal(t, 9001, rt[sealtasks.TTPreCommit1][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism) + require.Equal(t, 9001, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism) +} diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index f28f106b1..380e968e1 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -15,6 +15,12 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" ) +type WorkerID uuid.UUID // worker session UUID + +func (w WorkerID) String() string { + return uuid.UUID(w).String() +} + type WorkerInfo struct { Hostname string @@ -34,7 +40,29 @@ type WorkerResources struct { CPUs uint64 // Logical cores GPUs []string - ResourceOpts map[string]string + + // if nil use the default resource table + Resources map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources +} + +func (wr WorkerResources) ResourceSpec(spt abi.RegisteredSealProof, tt sealtasks.TaskType) Resources { + res := ResourceTable[tt][spt] + + // if the worker specifies custom resource table, prefer that + if wr.Resources != nil { + tr, ok := wr.Resources[tt] + if !ok { + return res + } + + r, ok := tr[spt] + if ok { + return r + } + } + + // otherwise, use the default resource table + return res } type WorkerStats struct { diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 7e5ce8f57..50bf6ada0 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -3,12 +3,10 @@ package sectorstorage import ( "context" "encoding/json" - "fmt" "io" "os" "reflect" "runtime" - "strconv" "sync" "sync/atomic" "time" @@ -546,28 +544,11 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) { return storiface.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err) } - resourceOpts := make(map[string]string) - for tt := range l.acceptTasks { - ttShort := tt.Short() - for _, res_opt := range []string{"_MAX_MEMORY", "_MIN_MEMORY", "_MAX_PARALLELISM", "_MAX_PARALLELISM_GPU", "_BASE_MIN_MEMORY", "_GPU_UTILIZATION"} { - n := ttShort + res_opt - if val, ok := os.LookupEnv(n); ok { - resourceOpts[n] = val - } - } - } - if _, ok := resourceOpts["PC1_MAX_PARALLELISM"]; !ok { - if os.Getenv("FIL_PROOFS_USE_MULTICORE_SDR") == "1" { - pc1MulticoreSDRProducers := 3 - if pc1MulticoreSDRProducersEnv := os.Getenv("FIL_PROOFS_MULTICORE_SDR_PRODUCERS"); pc1MulticoreSDRProducersEnv != "" { - pc1MulticoreSDRProducers, err = strconv.Atoi(pc1MulticoreSDRProducersEnv) - if err != nil { - log.Errorf("FIL_PROOFS_MULTICORE_SDR_PRODUCERS is not an integer: %+v", err) - pc1MulticoreSDRProducers = 3 - } - } - resourceOpts["PC1_MAX_PARALLELISM"] = fmt.Sprintf("%d", 1+pc1MulticoreSDRProducers) - } + resEnv, err := storiface.ParseResources(func(key, def string) (string, bool) { + return os.LookupEnv(key) + }) + if err != nil { + return storiface.WorkerInfo{}, xerrors.Errorf("interpreting resource env vars: %w", err) } return storiface.WorkerInfo{ @@ -580,7 +561,7 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) { MemSwapUsed: memSwapUsed, CPUs: uint64(runtime.NumCPU()), GPUs: gpus, - ResourceOpts: resourceOpts, + Resources: resEnv, }, }, nil } diff --git a/extern/sector-storage/worker_tracked.go b/extern/sector-storage/worker_tracked.go index 5702426c3..7a88d9bd4 100644 --- a/extern/sector-storage/worker_tracked.go +++ b/extern/sector-storage/worker_tracked.go @@ -20,7 +20,7 @@ import ( type trackedWork struct { job storiface.WorkerJob - worker WorkerID + worker storiface.WorkerID workerHostname string } @@ -58,7 +58,7 @@ func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) { delete(wt.running, callID) } -func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType, cb func() (storiface.CallID, error)) (storiface.CallID, error) { +func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid storiface.WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType, cb func() (storiface.CallID, error)) (storiface.CallID, error) { tracked := func(rw int, callID storiface.CallID) trackedWork { return trackedWork{ job: storiface.WorkerJob{ @@ -122,7 +122,7 @@ func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid Worke return callID, err } -func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) *trackedWorker { +func (wt *workTracker) worker(wid storiface.WorkerID, wi storiface.WorkerInfo, w Worker) *trackedWorker { return &trackedWorker{ Worker: w, wid: wid, @@ -152,7 +152,7 @@ func (wt *workTracker) Running() ([]trackedWork, []trackedWork) { type trackedWorker struct { Worker - wid WorkerID + wid storiface.WorkerID workerInfo storiface.WorkerInfo execute chan struct{} // channel blocking execution in case we're waiting for resources but the task is ready to execute