sched: WIP Windows

This commit is contained in:
Łukasz Magiera 2020-07-09 12:58:52 +02:00
parent a445979f1a
commit ac7dc28cfb

436
sched.go
View File

@ -3,11 +3,11 @@ package sectorstorage
import (
"container/heap"
"context"
"math/rand"
"sort"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
@ -20,6 +20,11 @@ type schedPrioCtxKey int
var SchedPriorityKey schedPrioCtxKey
var DefaultSchedPriority = 0
var SelectorTimeout = 5 * time.Second
var (
SchedWindows = 2
)
func getPriority(ctx context.Context) int {
sp := ctx.Value(SchedPriorityKey)
@ -56,11 +61,63 @@ type scheduler struct {
watchClosing chan WorkerID
workerClosing chan WorkerID
schedule chan *workerRequest
workerFree chan WorkerID
closing chan struct{}
schedule chan *workerRequest
windowRequests chan *schedWindowRequest
schedQueue *requestQueue
// owned by the sh.runSched goroutine
schedQueue *requestQueue
openWindows []*schedWindowRequest
closing chan struct{}
}
type workerHandle struct {
w Worker
info storiface.WorkerInfo
preparing *activeResources
active *activeResources
}
type schedWindowRequest struct {
worker WorkerID
done chan *schedWindow
}
type schedWindow struct {
worker WorkerID
allocated *activeResources
todo []*workerRequest
}
type activeResources struct {
memUsedMin uint64
memUsedMax uint64
gpuUsed bool
cpuUse uint64
cond *sync.Cond
}
type workerRequest struct {
sector abi.SectorID
taskType sealtasks.TaskType
priority int // larger values more important
sel WorkerSelector
prepare WorkerAction
work WorkerAction
index int // The index of the item in the heap.
ret chan<- workerResponse
ctx context.Context
}
type workerResponse struct {
err error
}
func newScheduler(spt abi.RegisteredSealProof) *scheduler {
@ -75,9 +132,8 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler {
watchClosing: make(chan WorkerID),
workerClosing: make(chan WorkerID),
schedule: make(chan *workerRequest),
workerFree: make(chan WorkerID),
closing: make(chan struct{}),
schedule: make(chan *workerRequest),
closing: make(chan struct{}),
schedQueue: &requestQueue{},
}
@ -115,25 +171,6 @@ func (sh *scheduler) Schedule(ctx context.Context, sector abi.SectorID, taskType
}
}
type workerRequest struct {
sector abi.SectorID
taskType sealtasks.TaskType
priority int // larger values more important
sel WorkerSelector
prepare WorkerAction
work WorkerAction
index int // The index of the item in the heap.
ret chan<- workerResponse
ctx context.Context
}
type workerResponse struct {
err error
}
func (r *workerRequest) respond(err error) {
select {
case r.ret <- workerResponse{err: err}:
@ -142,46 +179,25 @@ func (r *workerRequest) respond(err error) {
}
}
type activeResources struct {
memUsedMin uint64
memUsedMax uint64
gpuUsed bool
cpuUse uint64
cond *sync.Cond
}
type workerHandle struct {
w Worker
info storiface.WorkerInfo
preparing *activeResources
active *activeResources
}
func (sh *scheduler) runSched() {
go sh.runWorkerWatcher()
for {
select {
case w := <-sh.newWorkers:
sh.schedNewWorker(w)
case wid := <-sh.workerClosing:
sh.schedDropWorker(wid)
case req := <-sh.schedule:
scheduled, err := sh.maybeSchedRequest(req)
if err != nil {
req.respond(err)
continue
}
if scheduled {
continue
}
sh.newWorker(w)
case wid := <-sh.workerClosing:
sh.dropWorker(wid)
case req := <-sh.schedule:
heap.Push(sh.schedQueue, req)
case wid := <-sh.workerFree:
sh.onWorkerFreed(wid)
sh.trySched()
case req := <-sh.windowRequests:
sh.openWindows = append(sh.openWindows, req)
sh.trySched()
case <-sh.closing:
sh.schedClose()
return
@ -189,169 +205,161 @@ func (sh *scheduler) runSched() {
}
}
func (sh *scheduler) onWorkerFreed(wid WorkerID) {
sh.workersLk.Lock()
w, ok := sh.workers[wid]
sh.workersLk.Unlock()
if !ok {
log.Warnf("onWorkerFreed on invalid worker %d", wid)
func (sh *scheduler) trySched() {
/*
This assigns tasks to workers based on:
- Task priority (achieved by handling sh.schedQueue in order, since it's already sorted by priority)
- Worker resource availability
- Task-specified worker preference (acceptableWindows array below sorted by this preference)
- Window request age
1. For each task in the schedQueue find windows which can handle them
1.1. Create list of windows capable of handling a task
1.2. Sort windows according to task selector preferences
2. Going through schedQueue again, assign task to first acceptable window
with resources available
3. Submit windows with scheduled tasks to workers
*/
windows := make([]schedWindow, len(sh.openWindows))
acceptableWindows := make([][]int, sh.schedQueue.Len())
// Step 1
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][sh.spt]
for wnd, windowRequest := range sh.openWindows {
worker := sh.workers[windowRequest.worker]
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, worker.info.Resources) {
continue
}
ok, err := task.sel.Ok(task.ctx, task.taskType, sh.spt, worker)
if err != nil {
log.Errorf("trySched(1) req.sel.Ok error: %+v", err)
continue
}
if !ok {
continue
}
acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd)
}
if len(acceptableWindows[sqi]) == 0 {
continue
}
// Pick best worker (shuffle in case some workers are equally as good)
rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) {
acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i]
})
sort.SliceStable(acceptableWindows, func(i, j int) bool {
wii := sh.openWindows[acceptableWindows[sqi][i]].worker
wji := sh.openWindows[acceptableWindows[sqi][j]].worker
if wii == wji {
// for the same worker prefer older windows
return acceptableWindows[sqi][i] < acceptableWindows[sqi][j]
}
wi := sh.workers[wii]
wj := sh.workers[wji]
rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout)
defer cancel()
r, err := task.sel.Cmp(rpcCtx, task.taskType, wi, wj)
if err != nil {
log.Error("selecting best worker: %s", err)
}
return r
})
}
// Step 2
scheduled := 0
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][sh.spt]
selectedWindow := -1
for _, wnd := range acceptableWindows[sqi+scheduled] {
wid := sh.openWindows[wnd].worker
wr := sh.workers[wid].info.Resources
// TODO: allow bigger windows
if windows[wnd].allocated.canHandleRequest(needRes, wid, wr) {
continue
}
windows[wnd].allocated.add(wr, needRes)
selectedWindow = wnd
break
}
windows[selectedWindow].todo = append(windows[selectedWindow].todo, task)
heap.Remove(sh.schedQueue, sqi)
sqi--
scheduled++
}
// Step 3
if scheduled == 0 {
return
}
for i := 0; i < sh.schedQueue.Len(); i++ {
req := (*sh.schedQueue)[i]
ok, err := req.sel.Ok(req.ctx, req.taskType, sh.spt, w)
if err != nil {
log.Errorf("onWorkerFreed req.sel.Ok error: %+v", err)
scheduledWindows := map[int]struct{}{}
for wnd, window := range windows {
if len(window.todo) == 0 {
// Nothing scheduled here, keep the window open
continue
}
if !ok {
continue
}
scheduledWindows[wnd] = struct{}{}
scheduled, err := sh.maybeSchedRequest(req)
if err != nil {
req.respond(err)
continue
}
if scheduled {
heap.Remove(sh.schedQueue, i)
i--
continue
select {
case sh.openWindows[wnd].done <- &window:
default:
log.Error("expected sh.openWindows[wnd].done to be buffered")
}
}
// Rewrite sh.openWindows array, removing scheduled windows
newOpenWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)-len(scheduledWindows))
for wnd, window := range sh.openWindows {
if _, scheduled := scheduledWindows[wnd]; !scheduled {
// keep unscheduled windows open
continue
}
newOpenWindows = append(newOpenWindows, window)
}
sh.openWindows = newOpenWindows
}
var selectorTimeout = 5 * time.Second
func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) {
sh.workersLk.Lock()
defer sh.workersLk.Unlock()
tried := 0
var acceptable []WorkerID
needRes := ResourceTable[req.taskType][sh.spt]
for wid, worker := range sh.workers {
rpcCtx, cancel := context.WithTimeout(req.ctx, selectorTimeout)
ok, err := req.sel.Ok(rpcCtx, req.taskType, sh.spt, worker)
cancel()
if err != nil {
return false, err
}
if !ok {
continue
}
tried++
if !canHandleRequest(needRes, wid, worker.info.Resources, worker.preparing) {
continue
}
acceptable = append(acceptable, wid)
}
if len(acceptable) > 0 {
{
var serr error
sort.SliceStable(acceptable, func(i, j int) bool {
rpcCtx, cancel := context.WithTimeout(req.ctx, selectorTimeout)
defer cancel()
r, err := req.sel.Cmp(rpcCtx, req.taskType, sh.workers[acceptable[i]], sh.workers[acceptable[j]])
if err != nil {
serr = multierror.Append(serr, err)
}
return r
})
if serr != nil {
return false, xerrors.Errorf("error(s) selecting best worker: %w", serr)
}
}
return true, sh.assignWorker(acceptable[0], sh.workers[acceptable[0]], req)
}
if tried == 0 {
return false, xerrors.New("maybeSchedRequest didn't find any good workers")
}
return false, nil // put in waiting queue
}
func (sh *scheduler) assignWorker(wid WorkerID, w *workerHandle, req *workerRequest) error {
needRes := ResourceTable[req.taskType][sh.spt]
w.preparing.add(w.info.Resources, needRes)
func (sh *scheduler) runWorker(wid WorkerID) {
w := sh.workers[wid]
go func() {
err := req.prepare(req.ctx, w.w)
sh.workersLk.Lock()
for {
if err != nil {
w.preparing.free(w.info.Resources, needRes)
sh.workersLk.Unlock()
select {
case sh.workerFree <- wid:
case <-sh.closing:
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
}
select {
case req.ret <- workerResponse{err: err}:
case <-req.ctx.Done():
log.Warnf("request got cancelled before we could respond (prepare error: %+v)", err)
case <-sh.closing:
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
}
return
}
err = w.active.withResources(wid, w.info.Resources, needRes, &sh.workersLk, func() error {
w.preparing.free(w.info.Resources, needRes)
sh.workersLk.Unlock()
defer sh.workersLk.Lock() // we MUST return locked from this function
select {
case sh.workerFree <- wid:
case <-sh.closing:
}
err = req.work(req.ctx, w.w)
select {
case req.ret <- workerResponse{err: err}:
case <-req.ctx.Done():
log.Warnf("request got cancelled before we could respond")
case <-sh.closing:
log.Warnf("scheduler closed while sending response")
}
return nil
})
sh.workersLk.Unlock()
// This error should always be nil, since nothing is setting it, but just to be safe:
if err != nil {
log.Errorf("error executing worker (withResources): %+v", err)
}
}()
return nil
}
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error {
for !canHandleRequest(r, id, wr, a) {
for !a.canHandleRequest(r, id, wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
}
@ -396,16 +404,16 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
a.memUsedMax -= r.MaxMemory
}
func canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources, active *activeResources) bool {
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool {
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
minNeedMem := res.MemReserved + active.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
if minNeedMem > res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib)
return false
}
maxNeedMem := res.MemReserved + active.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
if maxNeedMem > res.MemSwap+res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
@ -413,19 +421,19 @@ func canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResou
}
if needRes.MultiThread() {
if active.cpuUse > 0 {
log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, active.cpuUse, res.CPUs)
if a.cpuUse > 0 {
log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs)
return false
}
} else {
if active.cpuUse+uint64(needRes.Threads) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, active.cpuUse, res.CPUs)
if a.cpuUse+uint64(needRes.Threads) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs)
return false
}
}
if len(res.GPUs) > 0 && needRes.CanGPU {
if active.gpuUsed {
if a.gpuUsed {
log.Debugf("sched: not scheduling on worker %d; GPU in use", wid)
return false
}
@ -453,7 +461,7 @@ func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
return max
}
func (sh *scheduler) schedNewWorker(w *workerHandle) {
func (sh *scheduler) newWorker(w *workerHandle) {
sh.workersLk.Lock()
id := sh.nextWorker
@ -468,10 +476,10 @@ func (sh *scheduler) schedNewWorker(w *workerHandle) {
return
}
sh.onWorkerFreed(id)
sh.runWorker(id)
}
func (sh *scheduler) schedDropWorker(wid WorkerID) {
func (sh *scheduler) dropWorker(wid WorkerID) {
sh.workersLk.Lock()
defer sh.workersLk.Unlock()