diff --git a/extern/sector-storage/manager_test.go b/extern/sector-storage/manager_test.go index d4044bbae..cb03b2ef4 100644 --- a/extern/sector-storage/manager_test.go +++ b/extern/sector-storage/manager_test.go @@ -332,7 +332,7 @@ func TestRestartWorker(t *testing.T) { return &testExec{apch: arch}, nil }, WorkerConfig{ TaskTypes: localTasks, - }, stor, lstor, idx, m, statestore.New(wds)) + }, os.LookupEnv, stor, lstor, idx, m, statestore.New(wds)) err := m.AddWorker(ctx, w) require.NoError(t, err) @@ -368,7 +368,7 @@ func TestRestartWorker(t *testing.T) { return &testExec{apch: arch}, nil }, WorkerConfig{ TaskTypes: localTasks, - }, stor, lstor, idx, m, statestore.New(wds)) + }, os.LookupEnv, stor, lstor, idx, m, statestore.New(wds)) err = m.AddWorker(ctx, w) require.NoError(t, err) @@ -404,7 +404,7 @@ func TestReenableWorker(t *testing.T) { return &testExec{apch: arch}, nil }, WorkerConfig{ TaskTypes: localTasks, - }, stor, lstor, idx, m, statestore.New(wds)) + }, os.LookupEnv, stor, lstor, idx, m, statestore.New(wds)) err := m.AddWorker(ctx, w) require.NoError(t, err) @@ -453,3 +453,123 @@ func TestReenableWorker(t *testing.T) { i, _ = m.sched.Info(ctx) require.Len(t, i.(SchedDiagInfo).OpenWindows, 2) } + +func TestResUse(t *testing.T) { + logging.SetAllLoggers(logging.LevelDebug) + + ctx, done := context.WithCancel(context.Background()) + defer done() + + ds := datastore.NewMapDatastore() + + m, lstor, stor, idx, cleanup := newTestMgr(ctx, t, ds) + defer cleanup() + + localTasks := []sealtasks.TaskType{ + sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch, + } + + wds := datastore.NewMapDatastore() + + arch := make(chan chan apres) + w := newLocalWorker(func() (ffiwrapper.Storage, error) { + return &testExec{apch: arch}, nil + }, WorkerConfig{ + TaskTypes: localTasks, + }, func(s string) (string, bool) { + return "", false + }, stor, lstor, idx, m, statestore.New(wds)) + + err := m.AddWorker(ctx, w) + require.NoError(t, err) + + sid := storage.SectorRef{ + ID: abi.SectorID{Miner: 1000, Number: 1}, + ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1, + } + + go func() { + _, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127))) + require.Error(t, err) + }() + +l: + for { + st := m.WorkerStats() + require.Len(t, st, 1) + for _, w := range st { + if w.MemUsedMax > 0 { + break l + } + time.Sleep(time.Millisecond) + } + } + + st := m.WorkerStats() + require.Len(t, st, 1) + for _, w := range st { + require.Equal(t, storiface.ResourceTable[sealtasks.TTAddPiece][abi.RegisteredSealProof_StackedDrg2KiBV1].MaxMemory, w.MemUsedMax) + } +} + +func TestResOverride(t *testing.T) { + logging.SetAllLoggers(logging.LevelDebug) + + ctx, done := context.WithCancel(context.Background()) + defer done() + + ds := datastore.NewMapDatastore() + + m, lstor, stor, idx, cleanup := newTestMgr(ctx, t, ds) + defer cleanup() + + localTasks := []sealtasks.TaskType{ + sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch, + } + + wds := datastore.NewMapDatastore() + + arch := make(chan chan apres) + w := newLocalWorker(func() (ffiwrapper.Storage, error) { + return &testExec{apch: arch}, nil + }, WorkerConfig{ + TaskTypes: localTasks, + }, func(s string) (string, bool) { + if s == "AP_2K_MAX_MEMORY" { + return "99999", true + } + + return "", false + }, stor, lstor, idx, m, statestore.New(wds)) + + err := m.AddWorker(ctx, w) + require.NoError(t, err) + + sid := storage.SectorRef{ + ID: abi.SectorID{Miner: 1000, Number: 1}, + ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1, + } + + go func() { + _, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127))) + require.Error(t, err) + }() + +l: + for { + st := m.WorkerStats() + require.Len(t, st, 1) + for _, w := range st { + if w.MemUsedMax > 0 { + break l + } + time.Sleep(time.Millisecond) + } + } + + st := m.WorkerStats() + require.Len(t, st, 1) + for _, w := range st { + require.Equal(t, uint64(99999), w.MemUsedMax) + } +} diff --git a/extern/sector-storage/piece_provider_test.go b/extern/sector-storage/piece_provider_test.go index 0abba1bd8..1aad3d2d2 100644 --- a/extern/sector-storage/piece_provider_test.go +++ b/extern/sector-storage/piece_provider_test.go @@ -7,6 +7,7 @@ import ( "math/rand" "net" "net/http" + "os" "testing" "github.com/filecoin-project/go-state-types/abi" @@ -286,7 +287,7 @@ func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtas worker := newLocalWorker(nil, WorkerConfig{ TaskTypes: tasks, - }, remote, localStore, p.index, p.mgr, csts) + }, os.LookupEnv, remote, localStore, p.index, p.mgr, csts) p.servers = append(p.servers, svc) p.localStores = append(p.localStores, localStore) diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index f64ed57e2..a2191a8e2 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -301,8 +301,8 @@ func TestSched(t *testing.T) { } testFunc := func(workers []workerSpec, tasks []task) func(t *testing.T) { - ParallelNum = 1 - ParallelDenom = 1 + storiface.ParallelNum = 1 + storiface.ParallelDenom = 1 return func(t *testing.T) { index := stores.NewIndex() @@ -618,7 +618,7 @@ func TestWindowCompact(t *testing.T) { taskType: task, sector: storage.SectorRef{ProofType: spt}, }) - window.allocated.add(wh.info.Resources, ResourceTable[task][spt]) + window.allocated.add(wh.info.Resources, storiface.ResourceTable[task][spt]) } wh.activeWindows = append(wh.activeWindows, window) @@ -637,7 +637,7 @@ func TestWindowCompact(t *testing.T) { for ti, task := range tasks { require.Equal(t, task, wh.activeWindows[wi].todo[ti].taskType, "%d, %d", wi, ti) - expectRes.add(wh.info.Resources, ResourceTable[task][spt]) + expectRes.add(wh.info.Resources, storiface.ResourceTable[task][spt]) } require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].allocated.cpuUse, "%d", wi) diff --git a/extern/sector-storage/storiface/resources.go b/extern/sector-storage/storiface/resources.go index f12d0df05..dd43a37dd 100644 --- a/extern/sector-storage/storiface/resources.go +++ b/extern/sector-storage/storiface/resources.go @@ -346,7 +346,7 @@ func init() { } } -func ParseResources(lookup func(key, def string) (string, bool)) (map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources, error) { +func ParseResourceEnv(lookup func(key, def string) (string, bool)) (map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources, error) { out := map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{} for taskType, defTT := range ResourceTable { diff --git a/extern/sector-storage/storiface/resources_test.go b/extern/sector-storage/storiface/resources_test.go index f58f46e23..bf7425d24 100644 --- a/extern/sector-storage/storiface/resources_test.go +++ b/extern/sector-storage/storiface/resources_test.go @@ -10,7 +10,7 @@ import ( ) func TestListResourceVars(t *testing.T) { - _, err := ParseResources(func(key, def string) (string, bool) { + _, err := ParseResourceEnv(func(key, def string) (string, bool) { if def != "" { fmt.Printf("%s=%s\n", key, def) } @@ -22,7 +22,7 @@ func TestListResourceVars(t *testing.T) { } func TestListResourceOverride(t *testing.T) { - rt, err := ParseResources(func(key, def string) (string, bool) { + rt, err := ParseResourceEnv(func(key, def string) (string, bool) { if key == "UNS_2K_MAX_PARALLELISM" { return "2", true } @@ -46,7 +46,7 @@ func TestListResourceOverride(t *testing.T) { } func TestListResourceSDRMulticoreOverride(t *testing.T) { - rt, err := ParseResources(func(key, def string) (string, bool) { + rt, err := ParseResourceEnv(func(key, def string) (string, bool) { if key == "FIL_PROOFS_USE_MULTICORE_SDR" { return "1", true } @@ -58,7 +58,7 @@ func TestListResourceSDRMulticoreOverride(t *testing.T) { require.Equal(t, 4, rt[sealtasks.TTPreCommit1][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism) require.Equal(t, 4, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism) - rt, err = ParseResources(func(key, def string) (string, bool) { + rt, err = ParseResourceEnv(func(key, def string) (string, bool) { if key == "FIL_PROOFS_USE_MULTICORE_SDR" { return "1", true } diff --git a/extern/sector-storage/testworker_test.go b/extern/sector-storage/testworker_test.go index 57c3b53ee..581c60d3b 100644 --- a/extern/sector-storage/testworker_test.go +++ b/extern/sector-storage/testworker_test.go @@ -102,7 +102,7 @@ func (t *testWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) { } func (t *testWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) { - res := ResourceTable[sealtasks.TTPreCommit2][abi.RegisteredSealProof_StackedDrg2KiBV1] + res := storiface.ResourceTable[sealtasks.TTPreCommit2][abi.RegisteredSealProof_StackedDrg2KiBV1] return storiface.WorkerInfo{ Hostname: "testworkerer", diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index de3a04f93..3545c50c0 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -42,6 +42,7 @@ type WorkerConfig struct { // used do provide custom proofs impl (mostly used in testing) type ExecutorFunc func() (ffiwrapper.Storage, error) +type EnvFunc func(string) (string, bool) type LocalWorker struct { storage stores.Store @@ -50,6 +51,7 @@ type LocalWorker struct { ret storiface.WorkerReturn executor ExecutorFunc noSwap bool + envLookup EnvFunc // see equivalent field on WorkerConfig. ignoreResources bool @@ -64,7 +66,7 @@ type LocalWorker struct { closing chan struct{} } -func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker { +func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker { acceptTasks := map[sealtasks.TaskType]struct{}{} for _, taskType := range wcfg.TaskTypes { acceptTasks[taskType] = struct{}{} @@ -82,6 +84,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store acceptTasks: acceptTasks, executor: executor, noSwap: wcfg.NoSwap, + envLookup: envLookup, ignoreResources: wcfg.IgnoreResourceFiltering, session: uuid.New(), closing: make(chan struct{}), @@ -115,7 +118,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store } func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker { - return newLocalWorker(nil, wcfg, store, local, sindex, ret, cst) + return newLocalWorker(nil, wcfg, os.LookupEnv, store, local, sindex, ret, cst) } type localWorkerPathProvider struct { @@ -544,8 +547,8 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) { return storiface.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err) } - resEnv, err := storiface.ParseResources(func(key, def string) (string, bool) { - return os.LookupEnv(key) + resEnv, err := storiface.ParseResourceEnv(func(key, def string) (string, bool) { + return l.envLookup(key) }) if err != nil { return storiface.WorkerInfo{}, xerrors.Errorf("interpreting resource env vars: %w", err)