diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 7842003a5..7a4b6f9ef 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -287,72 +287,93 @@ func (sh *scheduler) trySched() { sh.workersLk.RLock() defer sh.workersLk.RUnlock() + if len(sh.openWindows) == 0 { + // nothing to schedule on + return + } // Step 1 - for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { - task := (*sh.schedQueue)[sqi] - needRes := ResourceTable[task.taskType][sh.spt] + concurrency := len(sh.openWindows) + throttle := make(chan struct{}, concurrency) - task.indexHeap = sqi - for wnd, windowRequest := range sh.openWindows { - worker, ok := sh.workers[windowRequest.worker] - if !ok { - log.Errorf("worker referenced by windowRequest not found (worker: %d)", windowRequest.worker) - // TODO: How to move forward here? - continue + var wg sync.WaitGroup + wg.Add(sh.schedQueue.Len()) + + for i := 0; i < sh.schedQueue.Len(); i++ { + throttle <- struct{}{} + + go func(sqi int) { + defer wg.Done() + defer func() { + <-throttle + }() + + task := (*sh.schedQueue)[sqi] + needRes := ResourceTable[task.taskType][sh.spt] + + task.indexHeap = sqi + for wnd, windowRequest := range sh.openWindows { + worker, ok := sh.workers[windowRequest.worker] + if !ok { + log.Errorf("worker referenced by windowRequest not found (worker: %d)", windowRequest.worker) + // TODO: How to move forward here? + continue + } + + // TODO: allow bigger windows + if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, worker.info.Resources) { + continue + } + + rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout) + ok, err := task.sel.Ok(rpcCtx, task.taskType, sh.spt, worker) + cancel() + if err != nil { + log.Errorf("trySched(1) req.sel.Ok error: %+v", err) + continue + } + + if !ok { + continue + } + + acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd) } - // TODO: allow bigger windows - if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, worker.info.Resources) { - continue + if len(acceptableWindows[sqi]) == 0 { + return } - rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout) - ok, err := task.sel.Ok(rpcCtx, task.taskType, sh.spt, worker) - cancel() - if err != nil { - log.Errorf("trySched(1) req.sel.Ok error: %+v", err) - continue - } + // Pick best worker (shuffle in case some workers are equally as good) + rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) { + acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i] // nolint:scopelint + }) + sort.SliceStable(acceptableWindows[sqi], func(i, j int) bool { + wii := sh.openWindows[acceptableWindows[sqi][i]].worker // nolint:scopelint + wji := sh.openWindows[acceptableWindows[sqi][j]].worker // nolint:scopelint - if !ok { - continue - } + if wii == wji { + // for the same worker prefer older windows + return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint + } - acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd) - } + wi := sh.workers[wii] + wj := sh.workers[wji] - if len(acceptableWindows[sqi]) == 0 { - continue - } + rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout) + defer cancel() - // Pick best worker (shuffle in case some workers are equally as good) - rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) { - acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i] // nolint:scopelint - }) - sort.SliceStable(acceptableWindows[sqi], func(i, j int) bool { - wii := sh.openWindows[acceptableWindows[sqi][i]].worker // nolint:scopelint - wji := sh.openWindows[acceptableWindows[sqi][j]].worker // nolint:scopelint - - if wii == wji { - // for the same worker prefer older windows - return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint - } - - wi := sh.workers[wii] - wj := sh.workers[wji] - - rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout) - defer cancel() - - r, err := task.sel.Cmp(rpcCtx, task.taskType, wi, wj) - if err != nil { - log.Error("selecting best worker: %s", err) - } - return r - }) + r, err := task.sel.Cmp(rpcCtx, task.taskType, wi, wj) + if err != nil { + log.Error("selecting best worker: %s", err) + } + return r + }) + }(i) } + wg.Wait() + log.Debugf("SCHED windows: %+v", windows) log.Debugf("SCHED Acceptable win: %+v", acceptableWindows) diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 5c9bc44ee..fcfe891e7 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" "github.com/filecoin-project/specs-actors/actors/abi" @@ -100,16 +101,18 @@ func (s *schedTestWorker) Paths(ctx context.Context) ([]stores.StoragePath, erro return s.paths, nil } +var decentWorkerResources = storiface.WorkerResources{ + MemPhysical: 128 << 30, + MemSwap: 200 << 30, + MemReserved: 2 << 30, + CPUs: 32, + GPUs: []string{"a GPU"}, +} + func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) { return storiface.WorkerInfo{ - Hostname: s.name, - Resources: storiface.WorkerResources{ - MemPhysical: 128 << 30, - MemSwap: 200 << 30, - MemReserved: 2 << 30, - CPUs: 32, - GPUs: []string{"a GPU"}, - }, + Hostname: s.name, + Resources: decentWorkerResources, }, nil } @@ -451,3 +454,67 @@ func TestSched(t *testing.T) { })) } } + +type slowishSelector bool + +func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *workerHandle) (bool, error) { + time.Sleep(200 * time.Microsecond) + return bool(s), nil +} + +func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) { + time.Sleep(100 * time.Microsecond) + return true, nil +} + +var _ WorkerSelector = slowishSelector(true) + +func BenchmarkTrySched(b *testing.B) { + spt := abi.RegisteredSealProof_StackedDrg32GiBV1 + logging.SetAllLoggers(logging.LevelInfo) + defer logging.SetAllLoggers(logging.LevelDebug) + ctx := context.Background() + + test := func(windows, queue int) func(b *testing.B) { + return func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + + sched := newScheduler(spt) + sched.workers[0] = &workerHandle{ + w: nil, + info: storiface.WorkerInfo{ + Hostname: "t", + Resources: decentWorkerResources, + }, + preparing: &activeResources{}, + active: &activeResources{}, + } + + for i := 0; i < windows; i++ { + sched.openWindows = append(sched.openWindows, &schedWindowRequest{ + worker: 0, + done: make(chan *schedWindow, 1000), + }) + } + + for i := 0; i < queue; i++ { + sched.schedQueue.Push(&workerRequest{ + taskType: sealtasks.TTCommit2, + sel: slowishSelector(true), + ctx: ctx, + }) + } + + b.StartTimer() + + sched.trySched() + } + } + } + + b.Run("1w-1q", test(1, 1)) + b.Run("500w-1q", test(500, 1)) + b.Run("1w-500q", test(1, 500)) + b.Run("200w-400q", test(200, 400)) +}