diff --git a/extern/sector-storage/resources.go b/extern/sector-storage/resources.go index 5ba785fc2..e8e3e60cb 100644 --- a/extern/sector-storage/resources.go +++ b/extern/sector-storage/resources.go @@ -13,9 +13,15 @@ type Resources struct { MinMemory uint64 // What Must be in RAM for decent perf MaxMemory uint64 // Memory required (swap + ram) - MaxParallelism int // -1 = multithread + // GPUUtilization specifes the number of GPUs a task can use GPUUtilization float64 + // MaxParallelism specifies the number of CPU cores when GPU is NOT in use + MaxParallelism int // -1 = multithread + + // MaxParallelismGPU specifies the number of CPU cores when GPU is in use + MaxParallelismGPU int // when 0, inherits MaxParallelism + BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads) } @@ -35,8 +41,14 @@ var ParallelNum uint64 = 92 var ParallelDenom uint64 = 100 // TODO: Take NUMA into account -func (r Resources) Threads(wcpus uint64) uint64 { - if r.MaxParallelism == -1 { +func (r Resources) Threads(wcpus uint64, gpus int) uint64 { + mp := r.MaxParallelism + + if r.GPUUtilization > 0 && gpus > 0 && r.MaxParallelismGPU != 0 { // task can use GPUs and worker has some + mp = r.MaxParallelismGPU + } + + if mp == -1 { n := (wcpus * ParallelNum) / ParallelDenom if n == 0 { return wcpus @@ -44,7 +56,7 @@ func (r Resources) Threads(wcpus uint64) uint64 { return n } - return uint64(r.MaxParallelism) + return uint64(mp) } func (r *Resources) customizeForWorker(taskShortName string, wid WorkerID, info storiface.WorkerInfo) { @@ -81,6 +93,14 @@ func (r *Resources) customizeForWorker(taskShortName string, wid WorkerID, info r.MaxParallelism = i } } + if o, ok := info.Resources.ResourceOpts[taskShortName+"_MAX_PARALLELISM_GPU"]; ok { + i, err := strconv.Atoi(o) + if err != nil { + log.Errorf("unable to parse %s_GPU_PARALLELISM value %s: %e", taskShortName, o, err) + } else { + r.MaxParallelismGPU = i + } + } if o, ok := info.Resources.ResourceOpts[taskShortName+"_GPU_UTILIZATION"]; ok { i, err := strconv.ParseFloat(o, 64) if err != nil { @@ -182,8 +202,9 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources MaxMemory: 30 << 30, MinMemory: 30 << 30, - MaxParallelism: -1, - GPUUtilization: 1.0, + MaxParallelism: -1, + MaxParallelismGPU: 6, + GPUUtilization: 1.0, BaseMinMemory: 1 << 30, }, @@ -191,8 +212,9 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources MaxMemory: 15 << 30, MinMemory: 15 << 30, - MaxParallelism: -1, - GPUUtilization: 1.0, + MaxParallelism: -1, + MaxParallelismGPU: 6, + GPUUtilization: 1.0, BaseMinMemory: 1 << 30, }, @@ -268,8 +290,9 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources MaxMemory: 190 << 30, // TODO: Confirm MinMemory: 60 << 30, - MaxParallelism: 2, - GPUUtilization: 1.0, + MaxParallelism: -1, + MaxParallelismGPU: 6, + GPUUtilization: 1.0, BaseMinMemory: 64 << 30, // params }, @@ -277,8 +300,9 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources MaxMemory: 150 << 30, // TODO: ~30G of this should really be BaseMaxMemory MinMemory: 30 << 30, - MaxParallelism: 2, - GPUUtilization: 1.0, + MaxParallelism: -1, + MaxParallelismGPU: 6, + GPUUtilization: 1.0, BaseMinMemory: 32 << 30, // params }, diff --git a/extern/sector-storage/sched_resources.go b/extern/sector-storage/sched_resources.go index 636fc02d1..cbd8fb625 100644 --- a/extern/sector-storage/sched_resources.go +++ b/extern/sector-storage/sched_resources.go @@ -34,7 +34,7 @@ func (a *activeResources) add(wr storiface.WorkerResources, r Resources) { if r.GPUUtilization > 0 { a.gpuUsed += r.GPUUtilization } - a.cpuUse += r.Threads(wr.CPUs) + a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs)) a.memUsedMin += r.MinMemory a.memUsedMax += r.MaxMemory } @@ -43,7 +43,7 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { if r.GPUUtilization > 0 { a.gpuUsed -= r.GPUUtilization } - a.cpuUse -= r.Threads(wr.CPUs) + a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs)) a.memUsedMin -= r.MinMemory a.memUsedMax -= r.MaxMemory @@ -84,8 +84,8 @@ func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, call return false } - if a.cpuUse+needRes.Threads(res.CPUs) > res.CPUs { - log.Debugf("sched: not scheduling on worker %s for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads(res.CPUs), a.cpuUse, res.CPUs) + if a.cpuUse+needRes.Threads(res.CPUs, len(res.GPUs)) > res.CPUs { + log.Debugf("sched: not scheduling on worker %s for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads(res.CPUs, len(res.GPUs)), a.cpuUse, res.CPUs) return false } diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index de69cea80..7e5ce8f57 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -549,7 +549,7 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) { resourceOpts := make(map[string]string) for tt := range l.acceptTasks { ttShort := tt.Short() - for _, res_opt := range []string{"_MAX_MEMORY", "_MIN_MEMORY", "_MAX_PARALLELISM", "_BASE_MIN_MEMORY", "_GPU_UTILIZATION"} { + for _, res_opt := range []string{"_MAX_MEMORY", "_MIN_MEMORY", "_MAX_PARALLELISM", "_MAX_PARALLELISM_GPU", "_BASE_MIN_MEMORY", "_GPU_UTILIZATION"} { n := ttShort + res_opt if val, ok := os.LookupEnv(n); ok { resourceOpts[n] = val