lotus/extern/sector-storage/sched.go
2022-05-23 22:02:39 +02:00

385 lines
7.8 KiB
Go

package sectorstorage
import (
"context"
"sync"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type schedPrioCtxKey int
var SchedPriorityKey schedPrioCtxKey
var DefaultSchedPriority = 0
var SelectorTimeout = 5 * time.Second
var InitWait = 3 * time.Second
var (
SchedWindows = 2
)
func getPriority(ctx context.Context) int {
sp := ctx.Value(SchedPriorityKey)
if p, ok := sp.(int); ok {
return p
}
return DefaultSchedPriority
}
func WithPriority(ctx context.Context, priority int) context.Context {
return context.WithValue(ctx, SchedPriorityKey, priority)
}
const mib = 1 << 20
type WorkerAction func(ctx context.Context, w Worker) error
type WorkerSelector interface {
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) // true if worker is acceptable for performing a task
Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b
}
type Scheduler struct {
assigner Assigner
workersLk sync.RWMutex
Workers map[storiface.WorkerID]*WorkerHandle
schedule chan *WorkerRequest
windowRequests chan *SchedWindowRequest
workerChange chan struct{} // worker added / changed/freed resources
workerDisable chan workerDisableReq
// owned by the sh.runSched goroutine
SchedQueue *RequestQueue
OpenWindows []*SchedWindowRequest
workTracker *workTracker
info chan func(interface{})
closing chan struct{}
closed chan struct{}
testSync chan struct{} // used for testing
}
type WorkerHandle struct {
workerRpc Worker
tasksCache map[sealtasks.TaskType]struct{}
tasksUpdate time.Time
tasksLk sync.Mutex
Info storiface.WorkerInfo
preparing *activeResources // use with WorkerHandle.lk
active *activeResources // use with WorkerHandle.lk
lk sync.Mutex // can be taken inside sched.workersLk.RLock
wndLk sync.Mutex // can be taken inside sched.workersLk.RLock
activeWindows []*SchedWindow
Enabled bool
// for sync manager goroutine closing
cleanupStarted bool
closedMgr chan struct{}
closingMgr chan struct{}
}
type SchedWindowRequest struct {
Worker storiface.WorkerID
Done chan *SchedWindow
}
type SchedWindow struct {
Allocated activeResources
Todo []*WorkerRequest
}
type workerDisableReq struct {
activeWindows []*SchedWindow
wid storiface.WorkerID
done func()
}
type activeResources struct {
memUsedMin uint64
memUsedMax uint64
gpuUsed float64
cpuUse uint64
cond *sync.Cond
waiting int
}
type WorkerRequest struct {
Sector storage.SectorRef
TaskType sealtasks.TaskType
Priority int // larger values more important
Sel WorkerSelector
prepare WorkerAction
work WorkerAction
start time.Time
index int // The index of the item in the heap.
IndexHeap int
ret chan<- workerResponse
Ctx context.Context
}
type workerResponse struct {
err error
}
func newScheduler() *Scheduler {
return &Scheduler{
assigner: NewLowestUtilizationAssigner(),
Workers: map[storiface.WorkerID]*WorkerHandle{},
schedule: make(chan *WorkerRequest),
windowRequests: make(chan *SchedWindowRequest, 20),
workerChange: make(chan struct{}, 20),
workerDisable: make(chan workerDisableReq),
SchedQueue: &RequestQueue{},
workTracker: &workTracker{
done: map[storiface.CallID]struct{}{},
running: map[storiface.CallID]trackedWork{},
prepared: map[uuid.UUID]trackedWork{},
},
info: make(chan func(interface{})),
closing: make(chan struct{}),
closed: make(chan struct{}),
}
}
func (sh *Scheduler) Schedule(ctx context.Context, sector storage.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error {
ret := make(chan workerResponse)
select {
case sh.schedule <- &WorkerRequest{
Sector: sector,
TaskType: taskType,
Priority: getPriority(ctx),
Sel: sel,
prepare: prepare,
work: work,
start: time.Now(),
ret: ret,
Ctx: ctx,
}:
case <-sh.closing:
return xerrors.New("closing")
case <-ctx.Done():
return ctx.Err()
}
select {
case resp := <-ret:
return resp.err
case <-sh.closing:
return xerrors.New("closing")
case <-ctx.Done():
return ctx.Err()
}
}
func (r *WorkerRequest) respond(err error) {
select {
case r.ret <- workerResponse{err: err}:
case <-r.Ctx.Done():
log.Warnf("request got cancelled before we could respond")
}
}
type SchedDiagRequestInfo struct {
Sector abi.SectorID
TaskType sealtasks.TaskType
Priority int
}
type SchedDiagInfo struct {
Requests []SchedDiagRequestInfo
OpenWindows []string
}
func (sh *Scheduler) runSched() {
defer close(sh.closed)
iw := time.After(InitWait)
var initialised bool
for {
var doSched bool
var toDisable []workerDisableReq
select {
case <-sh.workerChange:
doSched = true
case dreq := <-sh.workerDisable:
toDisable = append(toDisable, dreq)
doSched = true
case req := <-sh.schedule:
sh.SchedQueue.Push(req)
doSched = true
if sh.testSync != nil {
sh.testSync <- struct{}{}
}
case req := <-sh.windowRequests:
sh.OpenWindows = append(sh.OpenWindows, req)
doSched = true
case ireq := <-sh.info:
ireq(sh.diag())
case <-iw:
initialised = true
iw = nil
doSched = true
case <-sh.closing:
sh.schedClose()
return
}
if doSched && initialised {
// First gather any pending tasks, so we go through the scheduling loop
// once for every added task
loop:
for {
select {
case <-sh.workerChange:
case dreq := <-sh.workerDisable:
toDisable = append(toDisable, dreq)
case req := <-sh.schedule:
sh.SchedQueue.Push(req)
if sh.testSync != nil {
sh.testSync <- struct{}{}
}
case req := <-sh.windowRequests:
sh.OpenWindows = append(sh.OpenWindows, req)
default:
break loop
}
}
for _, req := range toDisable {
for _, window := range req.activeWindows {
for _, request := range window.Todo {
sh.SchedQueue.Push(request)
}
}
openWindows := make([]*SchedWindowRequest, 0, len(sh.OpenWindows))
for _, window := range sh.OpenWindows {
if window.Worker != req.wid {
openWindows = append(openWindows, window)
}
}
sh.OpenWindows = openWindows
sh.workersLk.Lock()
sh.Workers[req.wid].Enabled = false
sh.workersLk.Unlock()
req.done()
}
sh.trySched()
}
}
}
func (sh *Scheduler) diag() SchedDiagInfo {
var out SchedDiagInfo
for sqi := 0; sqi < sh.SchedQueue.Len(); sqi++ {
task := (*sh.SchedQueue)[sqi]
out.Requests = append(out.Requests, SchedDiagRequestInfo{
Sector: task.Sector.ID,
TaskType: task.TaskType,
Priority: task.Priority,
})
}
sh.workersLk.RLock()
defer sh.workersLk.RUnlock()
for _, window := range sh.OpenWindows {
out.OpenWindows = append(out.OpenWindows, uuid.UUID(window.Worker).String())
}
return out
}
type Assigner interface {
TrySched(sh *Scheduler)
}
func (sh *Scheduler) trySched() {
sh.workersLk.RLock()
defer sh.workersLk.RUnlock()
sh.assigner.TrySched(sh)
}
func (sh *Scheduler) schedClose() {
sh.workersLk.Lock()
defer sh.workersLk.Unlock()
log.Debugf("closing scheduler")
for i, w := range sh.Workers {
sh.workerCleanup(i, w)
}
}
func (sh *Scheduler) Info(ctx context.Context) (interface{}, error) {
ch := make(chan interface{}, 1)
sh.info <- func(res interface{}) {
ch <- res
}
select {
case res := <-ch:
return res, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (sh *Scheduler) Close(ctx context.Context) error {
close(sh.closing)
select {
case <-sh.closed:
case <-ctx.Done():
return ctx.Err()
}
return nil
}