worker: Test resource table overrides

This commit is contained in:
Łukasz Magiera 2021-11-29 15:14:57 +01:00
parent 6d52d8552b
commit f25efecb74
7 changed files with 142 additions and 18 deletions

View File

@ -332,7 +332,7 @@ func TestRestartWorker(t *testing.T) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
}, stor, lstor, idx, m, statestore.New(wds))
}, os.LookupEnv, stor, lstor, idx, m, statestore.New(wds))
err := m.AddWorker(ctx, w)
require.NoError(t, err)
@ -368,7 +368,7 @@ func TestRestartWorker(t *testing.T) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
}, stor, lstor, idx, m, statestore.New(wds))
}, os.LookupEnv, stor, lstor, idx, m, statestore.New(wds))
err = m.AddWorker(ctx, w)
require.NoError(t, err)
@ -404,7 +404,7 @@ func TestReenableWorker(t *testing.T) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
}, stor, lstor, idx, m, statestore.New(wds))
}, os.LookupEnv, stor, lstor, idx, m, statestore.New(wds))
err := m.AddWorker(ctx, w)
require.NoError(t, err)
@ -453,3 +453,123 @@ func TestReenableWorker(t *testing.T) {
i, _ = m.sched.Info(ctx)
require.Len(t, i.(SchedDiagInfo).OpenWindows, 2)
}
func TestResUse(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
ctx, done := context.WithCancel(context.Background())
defer done()
ds := datastore.NewMapDatastore()
m, lstor, stor, idx, cleanup := newTestMgr(ctx, t, ds)
defer cleanup()
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
}
wds := datastore.NewMapDatastore()
arch := make(chan chan apres)
w := newLocalWorker(func() (ffiwrapper.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
}, func(s string) (string, bool) {
return "", false
}, stor, lstor, idx, m, statestore.New(wds))
err := m.AddWorker(ctx, w)
require.NoError(t, err)
sid := storage.SectorRef{
ID: abi.SectorID{Miner: 1000, Number: 1},
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1,
}
go func() {
_, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.Error(t, err)
}()
l:
for {
st := m.WorkerStats()
require.Len(t, st, 1)
for _, w := range st {
if w.MemUsedMax > 0 {
break l
}
time.Sleep(time.Millisecond)
}
}
st := m.WorkerStats()
require.Len(t, st, 1)
for _, w := range st {
require.Equal(t, storiface.ResourceTable[sealtasks.TTAddPiece][abi.RegisteredSealProof_StackedDrg2KiBV1].MaxMemory, w.MemUsedMax)
}
}
func TestResOverride(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
ctx, done := context.WithCancel(context.Background())
defer done()
ds := datastore.NewMapDatastore()
m, lstor, stor, idx, cleanup := newTestMgr(ctx, t, ds)
defer cleanup()
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
}
wds := datastore.NewMapDatastore()
arch := make(chan chan apres)
w := newLocalWorker(func() (ffiwrapper.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
}, func(s string) (string, bool) {
if s == "AP_2K_MAX_MEMORY" {
return "99999", true
}
return "", false
}, stor, lstor, idx, m, statestore.New(wds))
err := m.AddWorker(ctx, w)
require.NoError(t, err)
sid := storage.SectorRef{
ID: abi.SectorID{Miner: 1000, Number: 1},
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1,
}
go func() {
_, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.Error(t, err)
}()
l:
for {
st := m.WorkerStats()
require.Len(t, st, 1)
for _, w := range st {
if w.MemUsedMax > 0 {
break l
}
time.Sleep(time.Millisecond)
}
}
st := m.WorkerStats()
require.Len(t, st, 1)
for _, w := range st {
require.Equal(t, uint64(99999), w.MemUsedMax)
}
}

View File

@ -7,6 +7,7 @@ import (
"math/rand"
"net"
"net/http"
"os"
"testing"
"github.com/filecoin-project/go-state-types/abi"
@ -286,7 +287,7 @@ func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtas
worker := newLocalWorker(nil, WorkerConfig{
TaskTypes: tasks,
}, remote, localStore, p.index, p.mgr, csts)
}, os.LookupEnv, remote, localStore, p.index, p.mgr, csts)
p.servers = append(p.servers, svc)
p.localStores = append(p.localStores, localStore)

View File

@ -301,8 +301,8 @@ func TestSched(t *testing.T) {
}
testFunc := func(workers []workerSpec, tasks []task) func(t *testing.T) {
ParallelNum = 1
ParallelDenom = 1
storiface.ParallelNum = 1
storiface.ParallelDenom = 1
return func(t *testing.T) {
index := stores.NewIndex()
@ -618,7 +618,7 @@ func TestWindowCompact(t *testing.T) {
taskType: task,
sector: storage.SectorRef{ProofType: spt},
})
window.allocated.add(wh.info.Resources, ResourceTable[task][spt])
window.allocated.add(wh.info.Resources, storiface.ResourceTable[task][spt])
}
wh.activeWindows = append(wh.activeWindows, window)
@ -637,7 +637,7 @@ func TestWindowCompact(t *testing.T) {
for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].todo[ti].taskType, "%d, %d", wi, ti)
expectRes.add(wh.info.Resources, ResourceTable[task][spt])
expectRes.add(wh.info.Resources, storiface.ResourceTable[task][spt])
}
require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].allocated.cpuUse, "%d", wi)

View File

@ -346,7 +346,7 @@ func init() {
}
}
func ParseResources(lookup func(key, def string) (string, bool)) (map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources, error) {
func ParseResourceEnv(lookup func(key, def string) (string, bool)) (map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources, error) {
out := map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{}
for taskType, defTT := range ResourceTable {

View File

@ -10,7 +10,7 @@ import (
)
func TestListResourceVars(t *testing.T) {
_, err := ParseResources(func(key, def string) (string, bool) {
_, err := ParseResourceEnv(func(key, def string) (string, bool) {
if def != "" {
fmt.Printf("%s=%s\n", key, def)
}
@ -22,7 +22,7 @@ func TestListResourceVars(t *testing.T) {
}
func TestListResourceOverride(t *testing.T) {
rt, err := ParseResources(func(key, def string) (string, bool) {
rt, err := ParseResourceEnv(func(key, def string) (string, bool) {
if key == "UNS_2K_MAX_PARALLELISM" {
return "2", true
}
@ -46,7 +46,7 @@ func TestListResourceOverride(t *testing.T) {
}
func TestListResourceSDRMulticoreOverride(t *testing.T) {
rt, err := ParseResources(func(key, def string) (string, bool) {
rt, err := ParseResourceEnv(func(key, def string) (string, bool) {
if key == "FIL_PROOFS_USE_MULTICORE_SDR" {
return "1", true
}
@ -58,7 +58,7 @@ func TestListResourceSDRMulticoreOverride(t *testing.T) {
require.Equal(t, 4, rt[sealtasks.TTPreCommit1][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
require.Equal(t, 4, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
rt, err = ParseResources(func(key, def string) (string, bool) {
rt, err = ParseResourceEnv(func(key, def string) (string, bool) {
if key == "FIL_PROOFS_USE_MULTICORE_SDR" {
return "1", true
}

View File

@ -102,7 +102,7 @@ func (t *testWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
}
func (t *testWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) {
res := ResourceTable[sealtasks.TTPreCommit2][abi.RegisteredSealProof_StackedDrg2KiBV1]
res := storiface.ResourceTable[sealtasks.TTPreCommit2][abi.RegisteredSealProof_StackedDrg2KiBV1]
return storiface.WorkerInfo{
Hostname: "testworkerer",

View File

@ -42,6 +42,7 @@ type WorkerConfig struct {
// used do provide custom proofs impl (mostly used in testing)
type ExecutorFunc func() (ffiwrapper.Storage, error)
type EnvFunc func(string) (string, bool)
type LocalWorker struct {
storage stores.Store
@ -50,6 +51,7 @@ type LocalWorker struct {
ret storiface.WorkerReturn
executor ExecutorFunc
noSwap bool
envLookup EnvFunc
// see equivalent field on WorkerConfig.
ignoreResources bool
@ -64,7 +66,7 @@ type LocalWorker struct {
closing chan struct{}
}
func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
acceptTasks := map[sealtasks.TaskType]struct{}{}
for _, taskType := range wcfg.TaskTypes {
acceptTasks[taskType] = struct{}{}
@ -82,6 +84,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store
acceptTasks: acceptTasks,
executor: executor,
noSwap: wcfg.NoSwap,
envLookup: envLookup,
ignoreResources: wcfg.IgnoreResourceFiltering,
session: uuid.New(),
closing: make(chan struct{}),
@ -115,7 +118,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store
}
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
return newLocalWorker(nil, wcfg, store, local, sindex, ret, cst)
return newLocalWorker(nil, wcfg, os.LookupEnv, store, local, sindex, ret, cst)
}
type localWorkerPathProvider struct {
@ -544,8 +547,8 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
return storiface.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err)
}
resEnv, err := storiface.ParseResources(func(key, def string) (string, bool) {
return os.LookupEnv(key)
resEnv, err := storiface.ParseResourceEnv(func(key, def string) (string, bool) {
return l.envLookup(key)
})
if err != nil {
return storiface.WorkerInfo{}, xerrors.Errorf("interpreting resource env vars: %w", err)