Merge branch 'raulk/worker-disable-resource-filtering' into raulk/itests-refactor-kit
This commit is contained in:
commit
b7a5e3cd0f
Binary file not shown.
Binary file not shown.
@ -2205,6 +2205,7 @@ Response:
|
|||||||
"ef8d99a2-6865-4189-8ffa-9fef0f806eee": {
|
"ef8d99a2-6865-4189-8ffa-9fef0f806eee": {
|
||||||
"Info": {
|
"Info": {
|
||||||
"Hostname": "host",
|
"Hostname": "host",
|
||||||
|
"IgnoreResources": false,
|
||||||
"Resources": {
|
"Resources": {
|
||||||
"MemPhysical": 274877906944,
|
"MemPhysical": 274877906944,
|
||||||
"MemSwap": 128849018880,
|
"MemSwap": 128849018880,
|
||||||
|
@ -89,6 +89,7 @@ Response:
|
|||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"Hostname": "string value",
|
"Hostname": "string value",
|
||||||
|
"IgnoreResources": true,
|
||||||
"Resources": {
|
"Resources": {
|
||||||
"MemPhysical": 42,
|
"MemPhysical": 42,
|
||||||
"MemSwap": 42,
|
"MemSwap": 42,
|
||||||
|
29
extern/sector-storage/manager.go
vendored
29
extern/sector-storage/manager.go
vendored
@ -87,6 +87,20 @@ type result struct {
|
|||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResourceFilteringStrategy is an enum indicating the kinds of resource
|
||||||
|
// filtering strategies that can be configured for workers.
|
||||||
|
type ResourceFilteringStrategy string
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ResourceFilteringHardware specifies that available hardware resources
|
||||||
|
// should be evaluated when scheduling a task against the worker.
|
||||||
|
ResourceFilteringHardware = ResourceFilteringStrategy("hardware")
|
||||||
|
|
||||||
|
// ResourceFilteringDisabled disables resource filtering against this
|
||||||
|
// worker. The scheduler may assign any task to this worker.
|
||||||
|
ResourceFilteringDisabled = ResourceFilteringStrategy("disabled")
|
||||||
|
)
|
||||||
|
|
||||||
type SealerConfig struct {
|
type SealerConfig struct {
|
||||||
ParallelFetchLimit int
|
ParallelFetchLimit int
|
||||||
|
|
||||||
@ -96,6 +110,11 @@ type SealerConfig struct {
|
|||||||
AllowPreCommit2 bool
|
AllowPreCommit2 bool
|
||||||
AllowCommit bool
|
AllowCommit bool
|
||||||
AllowUnseal bool
|
AllowUnseal bool
|
||||||
|
|
||||||
|
// ResourceFiltering instructs the system which resource filtering strategy
|
||||||
|
// to use when evaluating tasks against this worker. An empty value defaults
|
||||||
|
// to "hardware".
|
||||||
|
ResourceFiltering ResourceFilteringStrategy
|
||||||
}
|
}
|
||||||
|
|
||||||
type StorageAuth http.Header
|
type StorageAuth http.Header
|
||||||
@ -104,7 +123,6 @@ type WorkerStateStore *statestore.StateStore
|
|||||||
type ManagerStateStore *statestore.StateStore
|
type ManagerStateStore *statestore.StateStore
|
||||||
|
|
||||||
func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
|
func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
|
||||||
|
|
||||||
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
|
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("creating prover instance: %w", err)
|
return nil, xerrors.Errorf("creating prover instance: %w", err)
|
||||||
@ -151,9 +169,12 @@ func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls store
|
|||||||
localTasks = append(localTasks, sealtasks.TTUnseal)
|
localTasks = append(localTasks, sealtasks.TTUnseal)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{
|
wcfg := WorkerConfig{
|
||||||
TaskTypes: localTasks,
|
IgnoreResourceFiltering: sc.ResourceFiltering == ResourceFilteringDisabled,
|
||||||
}, stor, lstor, si, m, wss))
|
TaskTypes: localTasks,
|
||||||
|
}
|
||||||
|
worker := NewLocalWorker(wcfg, stor, lstor, si, m, wss)
|
||||||
|
err = m.AddWorker(ctx, worker)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("adding local worker: %w", err)
|
return nil, xerrors.Errorf("adding local worker: %w", err)
|
||||||
}
|
}
|
||||||
|
24
extern/sector-storage/sched.go
vendored
24
extern/sector-storage/sched.go
vendored
@ -349,24 +349,24 @@ func (sh *scheduler) trySched() {
|
|||||||
defer sh.workersLk.RUnlock()
|
defer sh.workersLk.RUnlock()
|
||||||
|
|
||||||
windowsLen := len(sh.openWindows)
|
windowsLen := len(sh.openWindows)
|
||||||
queuneLen := sh.schedQueue.Len()
|
queueLen := sh.schedQueue.Len()
|
||||||
|
|
||||||
log.Debugf("SCHED %d queued; %d open windows", queuneLen, windowsLen)
|
log.Debugf("SCHED %d queued; %d open windows", queueLen, windowsLen)
|
||||||
|
|
||||||
if windowsLen == 0 || queuneLen == 0 {
|
if windowsLen == 0 || queueLen == 0 {
|
||||||
// nothing to schedule on
|
// nothing to schedule on
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
windows := make([]schedWindow, windowsLen)
|
windows := make([]schedWindow, windowsLen)
|
||||||
acceptableWindows := make([][]int, queuneLen)
|
acceptableWindows := make([][]int, queueLen)
|
||||||
|
|
||||||
// Step 1
|
// Step 1
|
||||||
throttle := make(chan struct{}, windowsLen)
|
throttle := make(chan struct{}, windowsLen)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(queuneLen)
|
wg.Add(queueLen)
|
||||||
for i := 0; i < queuneLen; i++ {
|
for i := 0; i < queueLen; i++ {
|
||||||
throttle <- struct{}{}
|
throttle <- struct{}{}
|
||||||
|
|
||||||
go func(sqi int) {
|
go func(sqi int) {
|
||||||
@ -393,7 +393,7 @@ func (sh *scheduler) trySched() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: allow bigger windows
|
// TODO: allow bigger windows
|
||||||
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) {
|
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -451,27 +451,27 @@ func (sh *scheduler) trySched() {
|
|||||||
|
|
||||||
// Step 2
|
// Step 2
|
||||||
scheduled := 0
|
scheduled := 0
|
||||||
rmQueue := make([]int, 0, queuneLen)
|
rmQueue := make([]int, 0, queueLen)
|
||||||
|
|
||||||
for sqi := 0; sqi < queuneLen; sqi++ {
|
for sqi := 0; sqi < queueLen; sqi++ {
|
||||||
task := (*sh.schedQueue)[sqi]
|
task := (*sh.schedQueue)[sqi]
|
||||||
needRes := ResourceTable[task.taskType][task.sector.ProofType]
|
needRes := ResourceTable[task.taskType][task.sector.ProofType]
|
||||||
|
|
||||||
selectedWindow := -1
|
selectedWindow := -1
|
||||||
for _, wnd := range acceptableWindows[task.indexHeap] {
|
for _, wnd := range acceptableWindows[task.indexHeap] {
|
||||||
wid := sh.openWindows[wnd].worker
|
wid := sh.openWindows[wnd].worker
|
||||||
wr := sh.workers[wid].info.Resources
|
info := sh.workers[wid].info
|
||||||
|
|
||||||
log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.ID.Number, wnd)
|
log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.ID.Number, wnd)
|
||||||
|
|
||||||
// TODO: allow bigger windows
|
// TODO: allow bigger windows
|
||||||
if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", wr) {
|
if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", info) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.ID.Number, task.taskType, wnd)
|
log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.ID.Number, task.taskType, wnd)
|
||||||
|
|
||||||
windows[wnd].allocated.add(wr, needRes)
|
windows[wnd].allocated.add(info.Resources, needRes)
|
||||||
// TODO: We probably want to re-sort acceptableWindows here based on new
|
// TODO: We probably want to re-sort acceptableWindows here based on new
|
||||||
// workerHandle.utilization + windows[wnd].allocated.utilization (workerHandle.utilization is used in all
|
// workerHandle.utilization + windows[wnd].allocated.utilization (workerHandle.utilization is used in all
|
||||||
// task selectors, but not in the same way, so need to figure out how to do that in a non-O(n^2 way), and
|
// task selectors, but not in the same way, so need to figure out how to do that in a non-O(n^2 way), and
|
||||||
|
15
extern/sector-storage/sched_resources.go
vendored
15
extern/sector-storage/sched_resources.go
vendored
@ -6,7 +6,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error {
|
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerInfo, r Resources, locker sync.Locker, cb func() error) error {
|
||||||
for !a.canHandleRequest(r, id, "withResources", wr) {
|
for !a.canHandleRequest(r, id, "withResources", wr) {
|
||||||
if a.cond == nil {
|
if a.cond == nil {
|
||||||
a.cond = sync.NewCond(locker)
|
a.cond = sync.NewCond(locker)
|
||||||
@ -14,11 +14,11 @@ func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResource
|
|||||||
a.cond.Wait()
|
a.cond.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
a.add(wr, r)
|
a.add(wr.Resources, r)
|
||||||
|
|
||||||
err := cb()
|
err := cb()
|
||||||
|
|
||||||
a.free(wr, r)
|
a.free(wr.Resources, r)
|
||||||
if a.cond != nil {
|
if a.cond != nil {
|
||||||
a.cond.Broadcast()
|
a.cond.Broadcast()
|
||||||
}
|
}
|
||||||
@ -44,8 +44,15 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
|
|||||||
a.memUsedMax -= r.MaxMemory
|
a.memUsedMax -= r.MaxMemory
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, res storiface.WorkerResources) bool {
|
// canHandleRequest evaluates if the worker has enough available resources to
|
||||||
|
// handle the request.
|
||||||
|
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, info storiface.WorkerInfo) bool {
|
||||||
|
if info.IgnoreResources {
|
||||||
|
// shortcircuit; if this worker is ignoring resources, it can always handle the request.
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
res := info.Resources
|
||||||
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
|
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
|
||||||
minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
|
minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
|
||||||
if minNeedMem > res.MemPhysical {
|
if minNeedMem > res.MemPhysical {
|
||||||
|
71
extern/sector-storage/sched_test.go
vendored
71
extern/sector-storage/sched_test.go
vendored
@ -38,6 +38,20 @@ func TestWithPriority(t *testing.T) {
|
|||||||
require.Equal(t, 2222, getPriority(ctx))
|
require.Equal(t, 2222, getPriority(ctx))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var decentWorkerResources = storiface.WorkerResources{
|
||||||
|
MemPhysical: 128 << 30,
|
||||||
|
MemSwap: 200 << 30,
|
||||||
|
MemReserved: 2 << 30,
|
||||||
|
CPUs: 32,
|
||||||
|
GPUs: []string{"a GPU"},
|
||||||
|
}
|
||||||
|
|
||||||
|
var constrainedWorkerResources = storiface.WorkerResources{
|
||||||
|
MemPhysical: 1 << 30,
|
||||||
|
MemReserved: 2 << 30,
|
||||||
|
CPUs: 1,
|
||||||
|
}
|
||||||
|
|
||||||
type schedTestWorker struct {
|
type schedTestWorker struct {
|
||||||
name string
|
name string
|
||||||
taskTypes map[sealtasks.TaskType]struct{}
|
taskTypes map[sealtasks.TaskType]struct{}
|
||||||
@ -45,6 +59,9 @@ type schedTestWorker struct {
|
|||||||
|
|
||||||
closed bool
|
closed bool
|
||||||
session uuid.UUID
|
session uuid.UUID
|
||||||
|
|
||||||
|
resources storiface.WorkerResources
|
||||||
|
ignoreResources bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
|
func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
|
||||||
@ -107,18 +124,11 @@ func (s *schedTestWorker) Paths(ctx context.Context) ([]stores.StoragePath, erro
|
|||||||
return s.paths, nil
|
return s.paths, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var decentWorkerResources = storiface.WorkerResources{
|
|
||||||
MemPhysical: 128 << 30,
|
|
||||||
MemSwap: 200 << 30,
|
|
||||||
MemReserved: 2 << 30,
|
|
||||||
CPUs: 32,
|
|
||||||
GPUs: []string{"a GPU"},
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) {
|
func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) {
|
||||||
return storiface.WorkerInfo{
|
return storiface.WorkerInfo{
|
||||||
Hostname: s.name,
|
Hostname: s.name,
|
||||||
Resources: decentWorkerResources,
|
IgnoreResources: s.ignoreResources,
|
||||||
|
Resources: s.resources,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -137,13 +147,16 @@ func (s *schedTestWorker) Close() error {
|
|||||||
|
|
||||||
var _ Worker = &schedTestWorker{}
|
var _ Worker = &schedTestWorker{}
|
||||||
|
|
||||||
func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}) {
|
func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) {
|
||||||
w := &schedTestWorker{
|
w := &schedTestWorker{
|
||||||
name: name,
|
name: name,
|
||||||
taskTypes: taskTypes,
|
taskTypes: taskTypes,
|
||||||
paths: []stores.StoragePath{{ID: "bb-8", Weight: 2, LocalPath: "<octopus>food</octopus>", CanSeal: true, CanStore: true}},
|
paths: []stores.StoragePath{{ID: "bb-8", Weight: 2, LocalPath: "<octopus>food</octopus>", CanSeal: true, CanStore: true}},
|
||||||
|
|
||||||
session: uuid.New(),
|
session: uuid.New(),
|
||||||
|
|
||||||
|
resources: resources,
|
||||||
|
ignoreResources: ignoreResources,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, path := range w.paths {
|
for _, path := range w.paths {
|
||||||
@ -169,7 +182,7 @@ func TestSchedStartStop(t *testing.T) {
|
|||||||
sched := newScheduler()
|
sched := newScheduler()
|
||||||
go sched.runSched()
|
go sched.runSched()
|
||||||
|
|
||||||
addTestWorker(t, sched, stores.NewIndex(), "fred", nil)
|
addTestWorker(t, sched, stores.NewIndex(), "fred", nil, decentWorkerResources, false)
|
||||||
|
|
||||||
require.NoError(t, sched.Close(context.TODO()))
|
require.NoError(t, sched.Close(context.TODO()))
|
||||||
}
|
}
|
||||||
@ -183,6 +196,9 @@ func TestSched(t *testing.T) {
|
|||||||
type workerSpec struct {
|
type workerSpec struct {
|
||||||
name string
|
name string
|
||||||
taskTypes map[sealtasks.TaskType]struct{}
|
taskTypes map[sealtasks.TaskType]struct{}
|
||||||
|
|
||||||
|
resources storiface.WorkerResources
|
||||||
|
ignoreResources bool
|
||||||
}
|
}
|
||||||
|
|
||||||
noopAction := func(ctx context.Context, w Worker) error {
|
noopAction := func(ctx context.Context, w Worker) error {
|
||||||
@ -295,7 +311,7 @@ func TestSched(t *testing.T) {
|
|||||||
go sched.runSched()
|
go sched.runSched()
|
||||||
|
|
||||||
for _, worker := range workers {
|
for _, worker := range workers {
|
||||||
addTestWorker(t, sched, index, worker.name, worker.taskTypes)
|
addTestWorker(t, sched, index, worker.name, worker.taskTypes, worker.resources, worker.ignoreResources)
|
||||||
}
|
}
|
||||||
|
|
||||||
rm := runMeta{
|
rm := runMeta{
|
||||||
@ -322,31 +338,42 @@ func TestSched(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// checks behaviour with workers with constrained resources
|
||||||
|
// the first one is not ignoring resource constraints, so we assign to the second worker, who is
|
||||||
|
t.Run("constrained-resources", testFunc([]workerSpec{
|
||||||
|
{name: "fred1", resources: constrainedWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
|
||||||
|
{name: "fred2", resources: constrainedWorkerResources, ignoreResources: true, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
|
||||||
|
}, []task{
|
||||||
|
sched("pc1-1", "fred2", 8, sealtasks.TTPreCommit1),
|
||||||
|
taskStarted("pc1-1"),
|
||||||
|
taskDone("pc1-1"),
|
||||||
|
}))
|
||||||
|
|
||||||
t.Run("one-pc1", testFunc([]workerSpec{
|
t.Run("one-pc1", testFunc([]workerSpec{
|
||||||
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
|
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
|
||||||
}, []task{
|
}, []task{
|
||||||
sched("pc1-1", "fred", 8, sealtasks.TTPreCommit1),
|
sched("pc1-1", "fred", 8, sealtasks.TTPreCommit1),
|
||||||
taskDone("pc1-1"),
|
taskDone("pc1-1"),
|
||||||
}))
|
}))
|
||||||
|
|
||||||
t.Run("pc1-2workers-1", testFunc([]workerSpec{
|
t.Run("pc1-2workers-1", testFunc([]workerSpec{
|
||||||
{name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
|
{name: "fred2", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
|
||||||
{name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
|
{name: "fred1", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
|
||||||
}, []task{
|
}, []task{
|
||||||
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
|
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
|
||||||
taskDone("pc1-1"),
|
taskDone("pc1-1"),
|
||||||
}))
|
}))
|
||||||
|
|
||||||
t.Run("pc1-2workers-2", testFunc([]workerSpec{
|
t.Run("pc1-2workers-2", testFunc([]workerSpec{
|
||||||
{name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
|
{name: "fred1", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
|
||||||
{name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
|
{name: "fred2", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
|
||||||
}, []task{
|
}, []task{
|
||||||
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
|
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
|
||||||
taskDone("pc1-1"),
|
taskDone("pc1-1"),
|
||||||
}))
|
}))
|
||||||
|
|
||||||
t.Run("pc1-block-pc2", testFunc([]workerSpec{
|
t.Run("pc1-block-pc2", testFunc([]workerSpec{
|
||||||
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
|
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
|
||||||
}, []task{
|
}, []task{
|
||||||
sched("pc1", "fred", 8, sealtasks.TTPreCommit1),
|
sched("pc1", "fred", 8, sealtasks.TTPreCommit1),
|
||||||
taskStarted("pc1"),
|
taskStarted("pc1"),
|
||||||
@ -359,7 +386,7 @@ func TestSched(t *testing.T) {
|
|||||||
}))
|
}))
|
||||||
|
|
||||||
t.Run("pc2-block-pc1", testFunc([]workerSpec{
|
t.Run("pc2-block-pc1", testFunc([]workerSpec{
|
||||||
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
|
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
|
||||||
}, []task{
|
}, []task{
|
||||||
sched("pc2", "fred", 8, sealtasks.TTPreCommit2),
|
sched("pc2", "fred", 8, sealtasks.TTPreCommit2),
|
||||||
taskStarted("pc2"),
|
taskStarted("pc2"),
|
||||||
@ -372,7 +399,7 @@ func TestSched(t *testing.T) {
|
|||||||
}))
|
}))
|
||||||
|
|
||||||
t.Run("pc1-batching", testFunc([]workerSpec{
|
t.Run("pc1-batching", testFunc([]workerSpec{
|
||||||
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
|
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
|
||||||
}, []task{
|
}, []task{
|
||||||
sched("t1", "fred", 8, sealtasks.TTPreCommit1),
|
sched("t1", "fred", 8, sealtasks.TTPreCommit1),
|
||||||
taskStarted("t1"),
|
taskStarted("t1"),
|
||||||
@ -459,7 +486,7 @@ func TestSched(t *testing.T) {
|
|||||||
// run this one a bunch of times, it had a very annoying tendency to fail randomly
|
// run this one a bunch of times, it had a very annoying tendency to fail randomly
|
||||||
for i := 0; i < 40; i++ {
|
for i := 0; i < 40; i++ {
|
||||||
t.Run("pc1-pc2-prio", testFunc([]workerSpec{
|
t.Run("pc1-pc2-prio", testFunc([]workerSpec{
|
||||||
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
|
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
|
||||||
}, []task{
|
}, []task{
|
||||||
// fill queues
|
// fill queues
|
||||||
twoPC1("w0", 0, taskStarted),
|
twoPC1("w0", 0, taskStarted),
|
||||||
|
6
extern/sector-storage/sched_worker.go
vendored
6
extern/sector-storage/sched_worker.go
vendored
@ -296,7 +296,7 @@ func (sw *schedWorker) workerCompactWindows() {
|
|||||||
|
|
||||||
for ti, todo := range window.todo {
|
for ti, todo := range window.todo {
|
||||||
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
|
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
|
||||||
if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info.Resources) {
|
if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -352,7 +352,7 @@ assignLoop:
|
|||||||
worker.lk.Lock()
|
worker.lk.Lock()
|
||||||
for t, todo := range firstWindow.todo {
|
for t, todo := range firstWindow.todo {
|
||||||
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
|
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
|
||||||
if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info.Resources) {
|
if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) {
|
||||||
tidx = t
|
tidx = t
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -424,7 +424,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
|
|||||||
}
|
}
|
||||||
|
|
||||||
// wait (if needed) for resources in the 'active' window
|
// wait (if needed) for resources in the 'active' window
|
||||||
err = w.active.withResources(sw.wid, w.info.Resources, needRes, &sh.workersLk, func() error {
|
err = w.active.withResources(sw.wid, w.info, needRes, &sh.workersLk, func() error {
|
||||||
w.lk.Lock()
|
w.lk.Lock()
|
||||||
w.preparing.free(w.info.Resources, needRes)
|
w.preparing.free(w.info.Resources, needRes)
|
||||||
w.lk.Unlock()
|
w.lk.Unlock()
|
||||||
|
7
extern/sector-storage/storiface/worker.go
vendored
7
extern/sector-storage/storiface/worker.go
vendored
@ -18,7 +18,12 @@ import (
|
|||||||
type WorkerInfo struct {
|
type WorkerInfo struct {
|
||||||
Hostname string
|
Hostname string
|
||||||
|
|
||||||
Resources WorkerResources
|
// IgnoreResources indicates whether the worker's available resources should
|
||||||
|
// be used ignored (true) or used (false) for the purposes of scheduling and
|
||||||
|
// task assignment. Only supported on local workers. Used for testing.
|
||||||
|
// Default should be false (zero value, i.e. resources taken into account).
|
||||||
|
IgnoreResources bool
|
||||||
|
Resources WorkerResources
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkerResources struct {
|
type WorkerResources struct {
|
||||||
|
25
extern/sector-storage/worker_local.go
vendored
25
extern/sector-storage/worker_local.go
vendored
@ -20,7 +20,7 @@ import (
|
|||||||
ffi "github.com/filecoin-project/filecoin-ffi"
|
ffi "github.com/filecoin-project/filecoin-ffi"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/go-statestore"
|
"github.com/filecoin-project/go-statestore"
|
||||||
storage "github.com/filecoin-project/specs-storage/storage"
|
"github.com/filecoin-project/specs-storage/storage"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
||||||
@ -33,6 +33,11 @@ var pathTypes = []storiface.SectorFileType{storiface.FTUnsealed, storiface.FTSea
|
|||||||
type WorkerConfig struct {
|
type WorkerConfig struct {
|
||||||
TaskTypes []sealtasks.TaskType
|
TaskTypes []sealtasks.TaskType
|
||||||
NoSwap bool
|
NoSwap bool
|
||||||
|
|
||||||
|
// IgnoreResourceFiltering enables task distribution to happen on this
|
||||||
|
// worker regardless of its currently available resources. Used in testing
|
||||||
|
// with the local worker.
|
||||||
|
IgnoreResourceFiltering bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// used do provide custom proofs impl (mostly used in testing)
|
// used do provide custom proofs impl (mostly used in testing)
|
||||||
@ -46,6 +51,9 @@ type LocalWorker struct {
|
|||||||
executor ExecutorFunc
|
executor ExecutorFunc
|
||||||
noSwap bool
|
noSwap bool
|
||||||
|
|
||||||
|
// see equivalent field on WorkerConfig.
|
||||||
|
ignoreResources bool
|
||||||
|
|
||||||
ct *workerCallTracker
|
ct *workerCallTracker
|
||||||
acceptTasks map[sealtasks.TaskType]struct{}
|
acceptTasks map[sealtasks.TaskType]struct{}
|
||||||
running sync.WaitGroup
|
running sync.WaitGroup
|
||||||
@ -71,12 +79,12 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store
|
|||||||
ct: &workerCallTracker{
|
ct: &workerCallTracker{
|
||||||
st: cst,
|
st: cst,
|
||||||
},
|
},
|
||||||
acceptTasks: acceptTasks,
|
acceptTasks: acceptTasks,
|
||||||
executor: executor,
|
executor: executor,
|
||||||
noSwap: wcfg.NoSwap,
|
noSwap: wcfg.NoSwap,
|
||||||
|
ignoreResources: wcfg.IgnoreResourceFiltering,
|
||||||
session: uuid.New(),
|
session: uuid.New(),
|
||||||
closing: make(chan struct{}),
|
closing: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if w.executor == nil {
|
if w.executor == nil {
|
||||||
@ -501,7 +509,8 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return storiface.WorkerInfo{
|
return storiface.WorkerInfo{
|
||||||
Hostname: hostname,
|
Hostname: hostname,
|
||||||
|
IgnoreResources: l.ignoreResources,
|
||||||
Resources: storiface.WorkerResources{
|
Resources: storiface.WorkerResources{
|
||||||
MemPhysical: mem.Total,
|
MemPhysical: mem.Total,
|
||||||
MemSwap: memSwap,
|
MemSwap: memSwap,
|
||||||
|
@ -34,6 +34,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/genesis"
|
"github.com/filecoin-project/lotus/genesis"
|
||||||
lotusminer "github.com/filecoin-project/lotus/miner"
|
lotusminer "github.com/filecoin-project/lotus/miner"
|
||||||
"github.com/filecoin-project/lotus/node"
|
"github.com/filecoin-project/lotus/node"
|
||||||
|
"github.com/filecoin-project/lotus/node/config"
|
||||||
"github.com/filecoin-project/lotus/node/modules"
|
"github.com/filecoin-project/lotus/node/modules"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
testing2 "github.com/filecoin-project/lotus/node/modules/testing"
|
testing2 "github.com/filecoin-project/lotus/node/modules/testing"
|
||||||
@ -440,6 +441,14 @@ func (n *Ensemble) Start() *Ensemble {
|
|||||||
|
|
||||||
node.Override(new(v1api.FullNode), m.FullNode),
|
node.Override(new(v1api.FullNode), m.FullNode),
|
||||||
node.Override(new(*lotusminer.Miner), lotusminer.NewTestMiner(mineBlock, m.ActorAddr)),
|
node.Override(new(*lotusminer.Miner), lotusminer.NewTestMiner(mineBlock, m.ActorAddr)),
|
||||||
|
|
||||||
|
// disable resource filtering so that local worker gets assigned tasks
|
||||||
|
// regardless of system pressure.
|
||||||
|
node.Override(new(*sectorstorage.SealerConfig), func() *sectorstorage.SealerConfig {
|
||||||
|
scfg := config.DefaultStorageMiner()
|
||||||
|
scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled
|
||||||
|
return &scfg.Storage
|
||||||
|
}),
|
||||||
}
|
}
|
||||||
|
|
||||||
// append any node builder options.
|
// append any node builder options.
|
||||||
|
@ -342,6 +342,9 @@ func DefaultStorageMiner() *StorageMiner {
|
|||||||
// Default to 10 - tcp should still be able to figure this out, and
|
// Default to 10 - tcp should still be able to figure this out, and
|
||||||
// it's the ratio between 10gbit / 1gbit
|
// it's the ratio between 10gbit / 1gbit
|
||||||
ParallelFetchLimit: 10,
|
ParallelFetchLimit: 10,
|
||||||
|
|
||||||
|
// By default use the hardware resource filtering strategy.
|
||||||
|
ResourceFiltering: sectorstorage.ResourceFilteringHardware,
|
||||||
},
|
},
|
||||||
|
|
||||||
Dealmaking: DealmakingConfig{
|
Dealmaking: DealmakingConfig{
|
||||||
|
Loading…
Reference in New Issue
Block a user