diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index b883a174f..5bd372db0 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -87,6 +87,20 @@ type result struct { err error } +// ResourceFilteringStrategy is an enum indicating the kinds of resource +// filtering strategies that can be configured for workers. +type ResourceFilteringStrategy string + +const ( + // ResourceFilteringHardware specifies that available hardware resources + // should be evaluated when scheduling a task against the worker. + ResourceFilteringHardware = ResourceFilteringStrategy("hardware") + + // ResourceFilteringDisabled disables resource filtering against this + // worker. The scheduler may assign any task to this worker. + ResourceFilteringDisabled = ResourceFilteringStrategy("disabled") +) + type SealerConfig struct { ParallelFetchLimit int @@ -97,9 +111,10 @@ type SealerConfig struct { AllowCommit bool AllowUnseal bool - // IgnoreResourceFiltering instructs the system to ignore available - // resources when assigning tasks to the local worker. - IgnoreResourceFiltering bool + // ResourceFiltering instructs the system which resource filtering strategy + // to use when evaluating tasks against this worker. An empty value defaults + // to "hardware". + ResourceFiltering ResourceFilteringStrategy } type StorageAuth http.Header @@ -108,7 +123,6 @@ type WorkerStateStore *statestore.StateStore type ManagerStateStore *statestore.StateStore func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { - prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si}) if err != nil { return nil, xerrors.Errorf("creating prover instance: %w", err) @@ -155,9 +169,12 @@ func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls store localTasks = append(localTasks, sealtasks.TTUnseal) } - err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{ - TaskTypes: localTasks, - }, stor, lstor, si, m, wss)) + wcfg := WorkerConfig{ + IgnoreResourceFiltering: sc.ResourceFiltering == ResourceFilteringHardware, + TaskTypes: localTasks, + } + worker := NewLocalWorker(wcfg, stor, lstor, si, m, wss) + err = m.AddWorker(ctx, worker) if err != nil { return nil, xerrors.Errorf("adding local worker: %w", err) } diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 63f3de64d..38b3efda9 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -38,6 +38,20 @@ func TestWithPriority(t *testing.T) { require.Equal(t, 2222, getPriority(ctx)) } +var decentWorkerResources = storiface.WorkerResources{ + MemPhysical: 128 << 30, + MemSwap: 200 << 30, + MemReserved: 2 << 30, + CPUs: 32, + GPUs: []string{"a GPU"}, +} + +var constrainedWorkerResources = storiface.WorkerResources{ + MemPhysical: 1 << 30, + MemReserved: 2 << 30, + CPUs: 1, +} + type schedTestWorker struct { name string taskTypes map[sealtasks.TaskType]struct{} @@ -45,6 +59,9 @@ type schedTestWorker struct { closed bool session uuid.UUID + + resources storiface.WorkerResources + ignoreResources bool } func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { @@ -107,18 +124,11 @@ func (s *schedTestWorker) Paths(ctx context.Context) ([]stores.StoragePath, erro return s.paths, nil } -var decentWorkerResources = storiface.WorkerResources{ - MemPhysical: 128 << 30, - MemSwap: 200 << 30, - MemReserved: 2 << 30, - CPUs: 32, - GPUs: []string{"a GPU"}, -} - func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) { return storiface.WorkerInfo{ - Hostname: s.name, - Resources: decentWorkerResources, + Hostname: s.name, + IgnoreResources: s.ignoreResources, + Resources: s.resources, }, nil } @@ -137,13 +147,16 @@ func (s *schedTestWorker) Close() error { var _ Worker = &schedTestWorker{} -func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}) { +func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) { w := &schedTestWorker{ name: name, taskTypes: taskTypes, paths: []stores.StoragePath{{ID: "bb-8", Weight: 2, LocalPath: "food", CanSeal: true, CanStore: true}}, session: uuid.New(), + + resources: resources, + ignoreResources: ignoreResources, } for _, path := range w.paths { @@ -169,7 +182,7 @@ func TestSchedStartStop(t *testing.T) { sched := newScheduler() go sched.runSched() - addTestWorker(t, sched, stores.NewIndex(), "fred", nil) + addTestWorker(t, sched, stores.NewIndex(), "fred", nil, decentWorkerResources, false) require.NoError(t, sched.Close(context.TODO())) } @@ -183,6 +196,9 @@ func TestSched(t *testing.T) { type workerSpec struct { name string taskTypes map[sealtasks.TaskType]struct{} + + resources storiface.WorkerResources + ignoreResources bool } noopAction := func(ctx context.Context, w Worker) error { @@ -295,7 +311,7 @@ func TestSched(t *testing.T) { go sched.runSched() for _, worker := range workers { - addTestWorker(t, sched, index, worker.name, worker.taskTypes) + addTestWorker(t, sched, index, worker.name, worker.taskTypes, worker.resources, worker.ignoreResources) } rm := runMeta{ @@ -322,31 +338,45 @@ func TestSched(t *testing.T) { } } + t.Run("constrained-resources-not-scheduled", testFunc([]workerSpec{ + {name: "fred", resources: constrainedWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, + }, []task{ + sched("pc1-1", "fred", 8, sealtasks.TTPreCommit1), + taskNotScheduled("pc1-1"), + })) + + t.Run("constrained-resources-ignored-scheduled", testFunc([]workerSpec{ + {name: "fred", resources: constrainedWorkerResources, ignoreResources: true, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, + }, []task{ + sched("pc1-1", "fred", 8, sealtasks.TTPreCommit1), + taskStarted("pc1-1"), + })) + t.Run("one-pc1", testFunc([]workerSpec{ - {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, + {name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, }, []task{ sched("pc1-1", "fred", 8, sealtasks.TTPreCommit1), taskDone("pc1-1"), })) t.Run("pc1-2workers-1", testFunc([]workerSpec{ - {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}}, - {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, + {name: "fred2", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}}, + {name: "fred1", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, }, []task{ sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1), taskDone("pc1-1"), })) t.Run("pc1-2workers-2", testFunc([]workerSpec{ - {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, - {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}}, + {name: "fred1", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, + {name: "fred2", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}}, }, []task{ sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1), taskDone("pc1-1"), })) t.Run("pc1-block-pc2", testFunc([]workerSpec{ - {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, + {name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, }, []task{ sched("pc1", "fred", 8, sealtasks.TTPreCommit1), taskStarted("pc1"), @@ -359,7 +389,7 @@ func TestSched(t *testing.T) { })) t.Run("pc2-block-pc1", testFunc([]workerSpec{ - {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, + {name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, }, []task{ sched("pc2", "fred", 8, sealtasks.TTPreCommit2), taskStarted("pc2"), @@ -372,7 +402,7 @@ func TestSched(t *testing.T) { })) t.Run("pc1-batching", testFunc([]workerSpec{ - {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, + {name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, }, []task{ sched("t1", "fred", 8, sealtasks.TTPreCommit1), taskStarted("t1"), @@ -459,7 +489,7 @@ func TestSched(t *testing.T) { // run this one a bunch of times, it had a very annoying tendency to fail randomly for i := 0; i < 40; i++ { t.Run("pc1-pc2-prio", testFunc([]workerSpec{ - {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, + {name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, }, []task{ // fill queues twoPC1("w0", 0, taskStarted), diff --git a/itests/kit/node_builder.go b/itests/kit/node_builder.go index 75cebf6bf..3306d427f 100644 --- a/itests/kit/node_builder.go +++ b/itests/kit/node_builder.go @@ -130,7 +130,7 @@ func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Addr node.Override(new(*sectorstorage.SealerConfig), func() *sectorstorage.SealerConfig { scfg := config.DefaultStorageMiner() - scfg.Storage.IgnoreResourceFiltering = true + scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled return &scfg.Storage }), @@ -541,7 +541,7 @@ func mockMinerBuilderOpts(t *testing.T, fullOpts []FullNodeOpts, storage []Stora node.Override(new(sectorstorage.PieceProvider), node.From(new(*mock.SectorMgr))), node.Override(new(*sectorstorage.SealerConfig), func() *sectorstorage.SealerConfig { scfg := config.DefaultStorageMiner() - scfg.Storage.IgnoreResourceFiltering = true + scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled return &scfg.Storage }), diff --git a/node/config/def.go b/node/config/def.go index 700a3f94f..a00ea7307 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -342,6 +342,9 @@ func DefaultStorageMiner() *StorageMiner { // Default to 10 - tcp should still be able to figure this out, and // it's the ratio between 10gbit / 1gbit ParallelFetchLimit: 10, + + // By default use the hardware resource filtering strategy. + ResourceFiltering: sectorstorage.ResourceFilteringHardware, }, Dealmaking: DealmakingConfig{