sectorstorage: Make trySched less very slow

This commit is contained in:
Łukasz Magiera 2020-08-24 19:13:36 +02:00
parent ee79855536
commit d9796cd25c
2 changed files with 149 additions and 61 deletions

View File

@ -287,72 +287,93 @@ func (sh *scheduler) trySched() {
sh.workersLk.RLock() sh.workersLk.RLock()
defer sh.workersLk.RUnlock() defer sh.workersLk.RUnlock()
if len(sh.openWindows) == 0 {
// nothing to schedule on
return
}
// Step 1 // Step 1
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { concurrency := len(sh.openWindows)
task := (*sh.schedQueue)[sqi] throttle := make(chan struct{}, concurrency)
needRes := ResourceTable[task.taskType][sh.spt]
task.indexHeap = sqi var wg sync.WaitGroup
for wnd, windowRequest := range sh.openWindows { wg.Add(sh.schedQueue.Len())
worker, ok := sh.workers[windowRequest.worker]
if !ok { for i := 0; i < sh.schedQueue.Len(); i++ {
log.Errorf("worker referenced by windowRequest not found (worker: %d)", windowRequest.worker) throttle <- struct{}{}
// TODO: How to move forward here?
continue go func(sqi int) {
defer wg.Done()
defer func() {
<-throttle
}()
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][sh.spt]
task.indexHeap = sqi
for wnd, windowRequest := range sh.openWindows {
worker, ok := sh.workers[windowRequest.worker]
if !ok {
log.Errorf("worker referenced by windowRequest not found (worker: %d)", windowRequest.worker)
// TODO: How to move forward here?
continue
}
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, worker.info.Resources) {
continue
}
rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout)
ok, err := task.sel.Ok(rpcCtx, task.taskType, sh.spt, worker)
cancel()
if err != nil {
log.Errorf("trySched(1) req.sel.Ok error: %+v", err)
continue
}
if !ok {
continue
}
acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd)
} }
// TODO: allow bigger windows if len(acceptableWindows[sqi]) == 0 {
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, worker.info.Resources) { return
continue
} }
rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout) // Pick best worker (shuffle in case some workers are equally as good)
ok, err := task.sel.Ok(rpcCtx, task.taskType, sh.spt, worker) rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) {
cancel() acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i] // nolint:scopelint
if err != nil { })
log.Errorf("trySched(1) req.sel.Ok error: %+v", err) sort.SliceStable(acceptableWindows[sqi], func(i, j int) bool {
continue wii := sh.openWindows[acceptableWindows[sqi][i]].worker // nolint:scopelint
} wji := sh.openWindows[acceptableWindows[sqi][j]].worker // nolint:scopelint
if !ok { if wii == wji {
continue // for the same worker prefer older windows
} return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint
}
acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd) wi := sh.workers[wii]
} wj := sh.workers[wji]
if len(acceptableWindows[sqi]) == 0 { rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout)
continue defer cancel()
}
// Pick best worker (shuffle in case some workers are equally as good) r, err := task.sel.Cmp(rpcCtx, task.taskType, wi, wj)
rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) { if err != nil {
acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i] // nolint:scopelint log.Error("selecting best worker: %s", err)
}) }
sort.SliceStable(acceptableWindows[sqi], func(i, j int) bool { return r
wii := sh.openWindows[acceptableWindows[sqi][i]].worker // nolint:scopelint })
wji := sh.openWindows[acceptableWindows[sqi][j]].worker // nolint:scopelint }(i)
if wii == wji {
// for the same worker prefer older windows
return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint
}
wi := sh.workers[wii]
wj := sh.workers[wji]
rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout)
defer cancel()
r, err := task.sel.Cmp(rpcCtx, task.taskType, wi, wj)
if err != nil {
log.Error("selecting best worker: %s", err)
}
return r
})
} }
wg.Wait()
log.Debugf("SCHED windows: %+v", windows) log.Debugf("SCHED windows: %+v", windows)
log.Debugf("SCHED Acceptable win: %+v", acceptableWindows) log.Debugf("SCHED Acceptable win: %+v", acceptableWindows)

View File

@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
@ -100,16 +101,18 @@ func (s *schedTestWorker) Paths(ctx context.Context) ([]stores.StoragePath, erro
return s.paths, nil return s.paths, nil
} }
var decentWorkerResources = storiface.WorkerResources{
MemPhysical: 128 << 30,
MemSwap: 200 << 30,
MemReserved: 2 << 30,
CPUs: 32,
GPUs: []string{"a GPU"},
}
func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) { func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) {
return storiface.WorkerInfo{ return storiface.WorkerInfo{
Hostname: s.name, Hostname: s.name,
Resources: storiface.WorkerResources{ Resources: decentWorkerResources,
MemPhysical: 128 << 30,
MemSwap: 200 << 30,
MemReserved: 2 << 30,
CPUs: 32,
GPUs: []string{"a GPU"},
},
}, nil }, nil
} }
@ -451,3 +454,67 @@ func TestSched(t *testing.T) {
})) }))
} }
} }
type slowishSelector bool
func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *workerHandle) (bool, error) {
time.Sleep(200 * time.Microsecond)
return bool(s), nil
}
func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) {
time.Sleep(100 * time.Microsecond)
return true, nil
}
var _ WorkerSelector = slowishSelector(true)
func BenchmarkTrySched(b *testing.B) {
spt := abi.RegisteredSealProof_StackedDrg32GiBV1
logging.SetAllLoggers(logging.LevelInfo)
defer logging.SetAllLoggers(logging.LevelDebug)
ctx := context.Background()
test := func(windows, queue int) func(b *testing.B) {
return func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
sched := newScheduler(spt)
sched.workers[0] = &workerHandle{
w: nil,
info: storiface.WorkerInfo{
Hostname: "t",
Resources: decentWorkerResources,
},
preparing: &activeResources{},
active: &activeResources{},
}
for i := 0; i < windows; i++ {
sched.openWindows = append(sched.openWindows, &schedWindowRequest{
worker: 0,
done: make(chan *schedWindow, 1000),
})
}
for i := 0; i < queue; i++ {
sched.schedQueue.Push(&workerRequest{
taskType: sealtasks.TTCommit2,
sel: slowishSelector(true),
ctx: ctx,
})
}
b.StartTimer()
sched.trySched()
}
}
}
b.Run("1w-1q", test(1, 1))
b.Run("500w-1q", test(500, 1))
b.Run("1w-500q", test(1, 500))
b.Run("200w-400q", test(200, 400))
}