cleanup worker resource overrides

This commit is contained in:
Łukasz Magiera 2021-11-29 14:42:20 +01:00
parent b961e1aab5
commit c9a2ff4007
11 changed files with 232 additions and 123 deletions

View File

@ -58,7 +58,7 @@ var (
FullAPIVersion1 = newVer(2, 1, 0)
MinerAPIVersion0 = newVer(1, 2, 0)
WorkerAPIVersion0 = newVer(1, 4, 0)
WorkerAPIVersion0 = newVer(1, 5, 0)
)
//nolint:varcheck,deadcode

View File

@ -51,13 +51,8 @@ type SectorManager interface {
FaultTracker
}
type WorkerID uuid.UUID // worker session UUID
var ClosedWorkerID = uuid.UUID{}
func (w WorkerID) String() string {
return uuid.UUID(w).String()
}
type Manager struct {
ls stores.LocalStorage
storage *stores.Remote

View File

@ -53,7 +53,7 @@ type WorkerSelector interface {
type scheduler struct {
workersLk sync.RWMutex
workers map[WorkerID]*workerHandle
workers map[storiface.WorkerID]*workerHandle
schedule chan *workerRequest
windowRequests chan *schedWindowRequest
@ -95,7 +95,7 @@ type workerHandle struct {
}
type schedWindowRequest struct {
worker WorkerID
worker storiface.WorkerID
done chan *schedWindow
}
@ -107,7 +107,7 @@ type schedWindow struct {
type workerDisableReq struct {
activeWindows []*schedWindow
wid WorkerID
wid storiface.WorkerID
done func()
}
@ -145,7 +145,7 @@ type workerResponse struct {
func newScheduler() *scheduler {
return &scheduler{
workers: map[WorkerID]*workerHandle{},
workers: map[storiface.WorkerID]*workerHandle{},
schedule: make(chan *workerRequest),
windowRequests: make(chan *schedWindowRequest, 20),
@ -378,7 +378,6 @@ func (sh *scheduler) trySched() {
}()
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][task.sector.ProofType]
task.indexHeap = sqi
for wnd, windowRequest := range sh.openWindows {
@ -394,6 +393,8 @@ func (sh *scheduler) trySched() {
continue
}
needRes := worker.info.Resources.ResourceSpec(task.sector.ProofType, task.taskType)
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info) {
continue
@ -457,7 +458,6 @@ func (sh *scheduler) trySched() {
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][task.sector.ProofType]
selectedWindow := -1
for _, wnd := range acceptableWindows[task.indexHeap] {
@ -466,6 +466,8 @@ func (sh *scheduler) trySched() {
log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.ID.Number, wnd)
needRes := info.Resources.ResourceSpec(task.sector.ProofType, task.taskType)
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", info) {
continue

View File

@ -6,7 +6,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerInfo, r Resources, locker sync.Locker, cb func() error) error {
func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.canHandleRequest(r, id, "withResources", wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
@ -30,7 +30,7 @@ func (a *activeResources) hasWorkWaiting() bool {
return a.waiting > 0
}
func (a *activeResources) add(wr storiface.WorkerResources, r Resources) {
func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resources) {
if r.GPUUtilization > 0 {
a.gpuUsed += r.GPUUtilization
}
@ -39,7 +39,7 @@ func (a *activeResources) add(wr storiface.WorkerResources, r Resources) {
a.memUsedMax += r.MaxMemory
}
func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resources) {
if r.GPUUtilization > 0 {
a.gpuUsed -= r.GPUUtilization
}
@ -54,7 +54,7 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
// canHandleRequest evaluates if the worker has enough available resources to
// handle the request.
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, info storiface.WorkerInfo) bool {
func (a *activeResources) canHandleRequest(needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
if info.IgnoreResources {
// shortcircuit; if this worker is ignoring resources, it can always handle the request.
return true

View File

@ -560,7 +560,7 @@ func BenchmarkTrySched(b *testing.B) {
b.StopTimer()
sched := newScheduler()
sched.workers[WorkerID{}] = &workerHandle{
sched.workers[storiface.WorkerID{}] = &workerHandle{
workerRpc: nil,
info: storiface.WorkerInfo{
Hostname: "t",
@ -572,7 +572,7 @@ func BenchmarkTrySched(b *testing.B) {
for i := 0; i < windows; i++ {
sched.openWindows = append(sched.openWindows, &schedWindowRequest{
worker: WorkerID{},
worker: storiface.WorkerID{},
done: make(chan *schedWindow, 1000),
})
}

View File

@ -4,17 +4,18 @@ import (
"context"
"time"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type schedWorker struct {
sched *scheduler
worker *workerHandle
wid WorkerID
wid storiface.WorkerID
heartbeatTimer *time.Ticker
scheduledWindows chan *schedWindow
@ -50,7 +51,7 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
closedMgr: make(chan struct{}),
}
wid := WorkerID(sessID)
wid := storiface.WorkerID(sessID)
sh.workersLk.Lock()
_, exist := sh.workers[wid]
@ -237,7 +238,7 @@ func (sw *schedWorker) checkSession(ctx context.Context) bool {
continue
}
if WorkerID(curSes) != sw.wid {
if storiface.WorkerID(curSes) != sw.wid {
if curSes != ClosedWorkerID {
// worker restarted
log.Warnw("worker session changed (worker restarted?)", "initial", sw.wid, "current", curSes)
@ -296,8 +297,7 @@ func (sw *schedWorker) workerCompactWindows() {
var moved []int
for ti, todo := range window.todo {
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
needRes.customizeForWorker(todo.taskType.Short(), sw.wid, worker.info)
needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType)
if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info) {
continue
}
@ -358,8 +358,7 @@ assignLoop:
worker.lk.Lock()
for t, todo := range firstWindow.todo {
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
needRes.customizeForWorker(todo.taskType.Short(), sw.wid, worker.info)
needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType)
if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) {
tidx = t
break
@ -420,7 +419,7 @@ assignLoop:
continue
}
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
needRes := storiface.ResourceTable[todo.taskType][todo.sector.ProofType]
if worker.active.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) {
tidx = t
break
@ -458,8 +457,7 @@ assignLoop:
func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
w, sh := sw.worker, sw.sched
needRes := ResourceTable[req.taskType][req.sector.ProofType]
needRes.customizeForWorker(req.taskType.Short(), sw.wid, w.info)
needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType)
w.lk.Lock()
w.preparing.add(w.info.Resources, needRes)
@ -542,7 +540,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
w, sh := sw.worker, sw.sched
needRes := ResourceTable[req.taskType][req.sector.ProofType]
needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType)
w.active.add(w.info.Resources, needRes)
@ -582,7 +580,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
return nil
}
func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) {
func (sh *scheduler) workerCleanup(wid storiface.WorkerID, w *workerHandle) {
select {
case <-w.closingMgr:
default:

View File

@ -1,28 +1,31 @@
package sectorstorage
package storiface
import (
"fmt"
"reflect"
"strconv"
"strings"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type Resources struct {
MinMemory uint64 // What Must be in RAM for decent perf
MaxMemory uint64 // Memory required (swap + ram)
MinMemory uint64 `envname:"MIN_MEMORY"` // What Must be in RAM for decent perf
MaxMemory uint64 `envname:"MAX_MEMORY"` // Memory required (swap + ram)
// GPUUtilization specifes the number of GPUs a task can use
GPUUtilization float64
GPUUtilization float64 `envname:"GPU_UTILIZATION"`
// MaxParallelism specifies the number of CPU cores when GPU is NOT in use
MaxParallelism int // -1 = multithread
MaxParallelism int `envname:"MAX_PARALLELISM"` // -1 = multithread
// MaxParallelismGPU specifies the number of CPU cores when GPU is in use
MaxParallelismGPU int // when 0, inherits MaxParallelism
MaxParallelismGPU int `envname:"MAX_PARALLELISM_GPU"` // when 0, inherits MaxParallelism
BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads)
BaseMinMemory uint64 `envname:"BASE_MIN_MEMORY"` // What Must be in RAM for decent perf (shared between threads)
}
/*
@ -59,59 +62,6 @@ func (r Resources) Threads(wcpus uint64, gpus int) uint64 {
return uint64(mp)
}
func (r *Resources) customizeForWorker(taskShortName string, wid WorkerID, info storiface.WorkerInfo) {
// update needed resources with worker options
if o, ok := info.Resources.ResourceOpts[taskShortName+"_MAX_MEMORY"]; ok {
i, err := strconv.ParseUint(o, 10, 64)
if err != nil {
log.Errorf("unable to parse %s_MAX_MEMORY value %s: %e", taskShortName, o, err)
} else {
r.MaxMemory = i
}
}
if o, ok := info.Resources.ResourceOpts[taskShortName+"_MIN_MEMORY"]; ok {
i, err := strconv.ParseUint(o, 10, 64)
if err != nil {
log.Errorf("unable to parse %s_MIN_MEMORY value %s: %e", taskShortName, o, err)
} else {
r.MinMemory = i
}
}
if o, ok := info.Resources.ResourceOpts[taskShortName+"_BASE_MIN_MEMORY"]; ok {
i, err := strconv.ParseUint(o, 10, 64)
if err != nil {
log.Errorf("unable to parse %s_BASE_MIN_MEMORY value %s: %e", taskShortName, o, err)
} else {
r.BaseMinMemory = i
}
}
if o, ok := info.Resources.ResourceOpts[taskShortName+"_MAX_PARALLELISM"]; ok {
i, err := strconv.Atoi(o)
if err != nil {
log.Errorf("unable to parse %s_MAX_PARALLELISM value %s: %e", taskShortName, o, err)
} else {
r.MaxParallelism = i
}
}
if o, ok := info.Resources.ResourceOpts[taskShortName+"_MAX_PARALLELISM_GPU"]; ok {
i, err := strconv.Atoi(o)
if err != nil {
log.Errorf("unable to parse %s_GPU_PARALLELISM value %s: %e", taskShortName, o, err)
} else {
r.MaxParallelismGPU = i
}
}
if o, ok := info.Resources.ResourceOpts[taskShortName+"_GPU_UTILIZATION"]; ok {
i, err := strconv.ParseFloat(o, 64)
if err != nil {
log.Errorf("unable to parse %s_GPU_UTILIZATION value %s: %e", taskShortName, o, err)
} else {
r.GPUUtilization = i
}
}
log.Debugf("resources required for %s on %s(%s): %+v", taskShortName, wid, info.Hostname, r)
}
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{
sealtasks.TTAddPiece: {
abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
@ -395,3 +345,83 @@ func init() {
m[abi.RegisteredSealProof_StackedDrg64GiBV1_1] = m[abi.RegisteredSealProof_StackedDrg64GiBV1]
}
}
func ParseResources(lookup func(key, def string) (string, bool)) (map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources, error) {
out := map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{}
for taskType, defTT := range ResourceTable {
out[taskType] = map[abi.RegisteredSealProof]Resources{}
for spt, defRes := range defTT {
r := defRes // copy
spsz, err := spt.SectorSize()
if err != nil {
return nil, xerrors.Errorf("getting sector size: %w", err)
}
shortSize := strings.TrimSuffix(spsz.ShortString(), "iB")
rr := reflect.ValueOf(&r)
for i := 0; i < rr.Elem().Type().NumField(); i++ {
f := rr.Elem().Type().Field(i)
envname := f.Tag.Get("envname")
if envname == "" {
return nil, xerrors.Errorf("no envname for field '%s'", f.Name)
}
envval, found := lookup(taskType.Short() + "_" + shortSize + "_" + envname, fmt.Sprint(rr.Elem().Field(i).Interface()))
if !found {
// special multicore SDR handling
if (taskType == sealtasks.TTPreCommit1 || taskType == sealtasks.TTUnseal) && envname == "MAX_PARALLELISM" {
v, ok := rr.Elem().Field(i).Addr().Interface().(*int)
if !ok {
// can't happen, but let's not panic
return nil, xerrors.Errorf("res.MAX_PARALLELISM is not int (!?): %w", err)
}
*v, err = getSDRThreads(lookup)
if err != nil {
return nil, err
}
}
continue
}
v := rr.Elem().Field(i).Addr().Interface()
switch fv := v.(type) {
case *uint64:
*fv, err = strconv.ParseUint(envval, 10, 64)
case *int:
*fv, err = strconv.Atoi(envval)
case *float64:
*fv, err = strconv.ParseFloat(envval, 64)
default:
return nil, xerrors.Errorf("unknown resource field type")
}
}
out[taskType][spt] = r
}
}
return out, nil
}
func getSDRThreads(lookup func(key, def string) (string, bool)) (_ int, err error) {
producers := 0
if v, _ := lookup("FIL_PROOFS_USE_MULTICORE_SDR", ""); v == "1" {
producers = 3
if penv, found := lookup("FIL_PROOFS_MULTICORE_SDR_PRODUCERS", ""); found {
producers, err = strconv.Atoi(penv)
if err != nil {
return 0, xerrors.Errorf("parsing (atoi) FIL_PROOFS_MULTICORE_SDR_PRODUCERS: %w", err)
}
}
}
// producers + the one core actually doing the work
return producers+1, nil
}

View File

@ -0,0 +1,75 @@
package storiface
import (
"fmt"
"testing"
stabi "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/stretchr/testify/require"
)
func TestListResourceVars(t *testing.T) {
_, err := ParseResources(func(key, def string) (string, bool) {
if def != "" {
fmt.Printf("%s=%s\n", key, def)
}
return "", false
})
require.NoError(t, err)
}
func TestListResourceOverride(t *testing.T) {
rt, err := ParseResources(func(key, def string) (string, bool) {
if key == "UNS_2K_MAX_PARALLELISM" {
return "2", true
}
if key == "PC2_2K_GPU_UTILIZATION" {
return "0.4", true
}
if key == "PC2_2K_MAX_MEMORY" {
return "2222", true
}
return "", false
})
require.NoError(t, err)
require.Equal(t, 2, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
require.Equal(t, 0.4, rt[sealtasks.TTPreCommit2][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].GPUUtilization)
require.Equal(t, uint64(2222), rt[sealtasks.TTPreCommit2][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxMemory)
// check that defaults don't get mutated
require.Equal(t, 1, ResourceTable[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
}
func TestListResourceSDRMulticoreOverride(t *testing.T) {
rt, err := ParseResources(func(key, def string) (string, bool) {
if key == "FIL_PROOFS_USE_MULTICORE_SDR" {
return "1", true
}
return "", false
})
require.NoError(t, err)
require.Equal(t, 4, rt[sealtasks.TTPreCommit1][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
require.Equal(t, 4, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
rt, err = ParseResources(func(key, def string) (string, bool) {
if key == "FIL_PROOFS_USE_MULTICORE_SDR" {
return "1", true
}
if key == "FIL_PROOFS_MULTICORE_SDR_PRODUCERS" {
return "9000", true
}
return "", false
})
require.NoError(t, err)
require.Equal(t, 9001, rt[sealtasks.TTPreCommit1][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
require.Equal(t, 9001, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
}

View File

@ -15,6 +15,12 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
)
type WorkerID uuid.UUID // worker session UUID
func (w WorkerID) String() string {
return uuid.UUID(w).String()
}
type WorkerInfo struct {
Hostname string
@ -34,7 +40,29 @@ type WorkerResources struct {
CPUs uint64 // Logical cores
GPUs []string
ResourceOpts map[string]string
// if nil use the default resource table
Resources map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
}
func (wr WorkerResources) ResourceSpec(spt abi.RegisteredSealProof, tt sealtasks.TaskType) Resources {
res := ResourceTable[tt][spt]
// if the worker specifies custom resource table, prefer that
if wr.Resources != nil {
tr, ok := wr.Resources[tt]
if !ok {
return res
}
r, ok := tr[spt]
if ok {
return r
}
}
// otherwise, use the default resource table
return res
}
type WorkerStats struct {

View File

@ -3,12 +3,10 @@ package sectorstorage
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"reflect"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
@ -546,28 +544,11 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
return storiface.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err)
}
resourceOpts := make(map[string]string)
for tt := range l.acceptTasks {
ttShort := tt.Short()
for _, res_opt := range []string{"_MAX_MEMORY", "_MIN_MEMORY", "_MAX_PARALLELISM", "_MAX_PARALLELISM_GPU", "_BASE_MIN_MEMORY", "_GPU_UTILIZATION"} {
n := ttShort + res_opt
if val, ok := os.LookupEnv(n); ok {
resourceOpts[n] = val
}
}
}
if _, ok := resourceOpts["PC1_MAX_PARALLELISM"]; !ok {
if os.Getenv("FIL_PROOFS_USE_MULTICORE_SDR") == "1" {
pc1MulticoreSDRProducers := 3
if pc1MulticoreSDRProducersEnv := os.Getenv("FIL_PROOFS_MULTICORE_SDR_PRODUCERS"); pc1MulticoreSDRProducersEnv != "" {
pc1MulticoreSDRProducers, err = strconv.Atoi(pc1MulticoreSDRProducersEnv)
if err != nil {
log.Errorf("FIL_PROOFS_MULTICORE_SDR_PRODUCERS is not an integer: %+v", err)
pc1MulticoreSDRProducers = 3
}
}
resourceOpts["PC1_MAX_PARALLELISM"] = fmt.Sprintf("%d", 1+pc1MulticoreSDRProducers)
}
resEnv, err := storiface.ParseResources(func(key, def string) (string, bool) {
return os.LookupEnv(key)
})
if err != nil {
return storiface.WorkerInfo{}, xerrors.Errorf("interpreting resource env vars: %w", err)
}
return storiface.WorkerInfo{
@ -580,7 +561,7 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
MemSwapUsed: memSwapUsed,
CPUs: uint64(runtime.NumCPU()),
GPUs: gpus,
ResourceOpts: resourceOpts,
Resources: resEnv,
},
}, nil
}

View File

@ -20,7 +20,7 @@ import (
type trackedWork struct {
job storiface.WorkerJob
worker WorkerID
worker storiface.WorkerID
workerHostname string
}
@ -58,7 +58,7 @@ func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) {
delete(wt.running, callID)
}
func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType, cb func() (storiface.CallID, error)) (storiface.CallID, error) {
func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid storiface.WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType, cb func() (storiface.CallID, error)) (storiface.CallID, error) {
tracked := func(rw int, callID storiface.CallID) trackedWork {
return trackedWork{
job: storiface.WorkerJob{
@ -122,7 +122,7 @@ func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid Worke
return callID, err
}
func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) *trackedWorker {
func (wt *workTracker) worker(wid storiface.WorkerID, wi storiface.WorkerInfo, w Worker) *trackedWorker {
return &trackedWorker{
Worker: w,
wid: wid,
@ -152,7 +152,7 @@ func (wt *workTracker) Running() ([]trackedWork, []trackedWork) {
type trackedWorker struct {
Worker
wid WorkerID
wid storiface.WorkerID
workerInfo storiface.WorkerInfo
execute chan struct{} // channel blocking execution in case we're waiting for resources but the task is ready to execute