lotus/extern/sector-storage/sched.go

394 lines
8.1 KiB
Go
Raw Normal View History

package sectorstorage
2020-03-23 11:40:02 +00:00
import (
"context"
"sync"
2020-06-23 09:42:47 +00:00
"time"
"github.com/google/uuid"
2020-03-23 11:40:02 +00:00
"golang.org/x/xerrors"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
2020-03-23 11:40:02 +00:00
)
2020-06-24 21:06:56 +00:00
type schedPrioCtxKey int
var SchedPriorityKey schedPrioCtxKey
var DefaultSchedPriority = 0
2020-07-09 10:58:52 +00:00
var SelectorTimeout = 5 * time.Second
var InitWait = 3 * time.Second
2020-07-09 10:58:52 +00:00
var (
SchedWindows = 2
)
2020-06-24 21:06:56 +00:00
func getPriority(ctx context.Context) int {
sp := ctx.Value(SchedPriorityKey)
if p, ok := sp.(int); ok {
return p
}
return DefaultSchedPriority
}
func WithPriority(ctx context.Context, priority int) context.Context {
return context.WithValue(ctx, SchedPriorityKey, priority)
}
2020-03-23 11:40:02 +00:00
const mib = 1 << 20
type WorkerAction func(ctx context.Context, w Worker) error
type WorkerSelector interface {
// Ok is true if worker is acceptable for performing a task.
// If any worker is preferred for a task, other workers won't be considered for that task.
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (ok, preferred bool, err error)
Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b
}
type Scheduler struct {
assigner Assigner
workersLk sync.RWMutex
2020-05-01 18:00:17 +00:00
Workers map[storiface.WorkerID]*WorkerHandle
schedule chan *WorkerRequest
windowRequests chan *SchedWindowRequest
workerChange chan struct{} // worker added / changed/freed resources
workerDisable chan workerDisableReq
2020-07-09 10:58:52 +00:00
// owned by the sh.runSched goroutine
SchedQueue *RequestQueue
OpenWindows []*SchedWindowRequest
2020-07-09 10:58:52 +00:00
2020-10-28 13:23:38 +00:00
workTracker *workTracker
2020-09-23 12:56:37 +00:00
info chan func(interface{})
2020-07-16 23:32:49 +00:00
closing chan struct{}
2020-07-17 10:59:12 +00:00
closed chan struct{}
2020-07-16 23:26:55 +00:00
testSync chan struct{} // used for testing
2020-07-09 10:58:52 +00:00
}
type WorkerHandle struct {
2020-10-28 13:23:38 +00:00
workerRpc Worker
2020-07-09 10:58:52 +00:00
2022-04-06 16:31:42 +00:00
tasksCache map[sealtasks.TaskType]struct{}
tasksUpdate time.Time
tasksLk sync.Mutex
Info storiface.WorkerInfo
2020-07-09 10:58:52 +00:00
2022-05-27 14:15:52 +00:00
preparing *ActiveResources // use with WorkerHandle.lk
active *ActiveResources // use with WorkerHandle.lk
2020-07-17 10:59:12 +00:00
lk sync.Mutex // can be taken inside sched.workersLk.RLock
2020-08-03 12:18:11 +00:00
wndLk sync.Mutex // can be taken inside sched.workersLk.RLock
activeWindows []*SchedWindow
Enabled bool
2020-07-21 18:01:25 +00:00
2020-07-17 10:59:12 +00:00
// for sync manager goroutine closing
cleanupStarted bool
closedMgr chan struct{}
closingMgr chan struct{}
2020-07-09 10:58:52 +00:00
}
type SchedWindowRequest struct {
Worker storiface.WorkerID
2020-07-09 10:58:52 +00:00
Done chan *SchedWindow
2020-07-09 10:58:52 +00:00
}
type SchedWindow struct {
2022-05-27 14:15:52 +00:00
Allocated ActiveResources
Todo []*WorkerRequest
2020-07-09 10:58:52 +00:00
}
type workerDisableReq struct {
activeWindows []*SchedWindow
2021-11-29 13:42:20 +00:00
wid storiface.WorkerID
done func()
}
type WorkerRequest struct {
Sector storage.SectorRef
TaskType sealtasks.TaskType
Priority int // larger values more important
Sel WorkerSelector
2020-07-09 10:58:52 +00:00
prepare WorkerAction
work WorkerAction
start time.Time
2020-07-09 10:58:52 +00:00
index int // The index of the item in the heap.
IndexHeap int
2020-07-27 11:21:36 +00:00
ret chan<- workerResponse
Ctx context.Context
2020-07-09 10:58:52 +00:00
}
2020-07-09 10:58:52 +00:00
type workerResponse struct {
err error
}
2022-05-23 14:58:43 +00:00
func newScheduler(assigner string) (*Scheduler, error) {
var a Assigner
switch assigner {
case "", "utilization":
a = NewLowestUtilizationAssigner()
case "spread":
a = NewSpreadAssigner()
default:
return nil, xerrors.Errorf("unknown assigner '%s'", assigner)
}
return &Scheduler{
2022-05-23 14:58:43 +00:00
assigner: a,
2020-05-01 18:00:17 +00:00
Workers: map[storiface.WorkerID]*WorkerHandle{},
schedule: make(chan *WorkerRequest),
windowRequests: make(chan *SchedWindowRequest, 20),
workerChange: make(chan struct{}, 20),
workerDisable: make(chan workerDisableReq),
SchedQueue: &RequestQueue{},
2020-07-09 12:40:53 +00:00
2020-10-28 13:23:38 +00:00
workTracker: &workTracker{
2021-10-15 19:04:03 +00:00
done: map[storiface.CallID]struct{}{},
running: map[storiface.CallID]trackedWork{},
prepared: map[uuid.UUID]trackedWork{},
2020-09-23 12:56:37 +00:00
},
info: make(chan func(interface{})),
2020-07-09 12:40:53 +00:00
closing: make(chan struct{}),
2020-07-17 10:59:12 +00:00
closed: make(chan struct{}),
2022-05-23 14:58:43 +00:00
}, nil
}
func (sh *Scheduler) Schedule(ctx context.Context, sector storage.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error {
ret := make(chan workerResponse)
select {
case sh.schedule <- &WorkerRequest{
Sector: sector,
TaskType: taskType,
Priority: getPriority(ctx),
Sel: sel,
prepare: prepare,
work: work,
start: time.Now(),
ret: ret,
Ctx: ctx,
}:
case <-sh.closing:
return xerrors.New("closing")
case <-ctx.Done():
return ctx.Err()
}
select {
case resp := <-ret:
return resp.err
case <-sh.closing:
return xerrors.New("closing")
case <-ctx.Done():
return ctx.Err()
}
}
func (r *WorkerRequest) respond(err error) {
2020-03-23 11:40:02 +00:00
select {
case r.ret <- workerResponse{err: err}:
case <-r.Ctx.Done():
2020-03-23 11:40:02 +00:00
log.Warnf("request got cancelled before we could respond")
}
}
func (r *WorkerRequest) SealTask() sealtasks.SealTaskType {
return sealtasks.SealTaskType{
TaskType: r.TaskType,
RegisteredSealProof: r.Sector.ProofType,
}
}
type SchedDiagRequestInfo struct {
Sector abi.SectorID
TaskType sealtasks.TaskType
Priority int
}
type SchedDiagInfo struct {
2020-07-27 11:21:36 +00:00
Requests []SchedDiagRequestInfo
OpenWindows []string
}
func (sh *Scheduler) runSched() {
2020-07-17 10:59:12 +00:00
defer close(sh.closed)
iw := time.After(InitWait)
var initialised bool
2020-03-23 11:40:02 +00:00
for {
var doSched bool
var toDisable []workerDisableReq
2020-03-23 11:40:02 +00:00
select {
case <-sh.workerChange:
doSched = true
case dreq := <-sh.workerDisable:
toDisable = append(toDisable, dreq)
doSched = true
2020-07-09 10:58:52 +00:00
case req := <-sh.schedule:
sh.SchedQueue.Push(req)
doSched = true
2020-07-09 10:58:52 +00:00
2020-07-16 23:26:55 +00:00
if sh.testSync != nil {
sh.testSync <- struct{}{}
}
2020-07-09 10:58:52 +00:00
case req := <-sh.windowRequests:
sh.OpenWindows = append(sh.OpenWindows, req)
doSched = true
case ireq := <-sh.info:
ireq(sh.diag())
case <-iw:
initialised = true
iw = nil
doSched = true
case <-sh.closing:
sh.schedClose()
2020-03-24 23:49:45 +00:00
return
2020-03-23 11:40:02 +00:00
}
if doSched && initialised {
// First gather any pending tasks, so we go through the scheduling loop
// once for every added task
loop:
for {
select {
case <-sh.workerChange:
case dreq := <-sh.workerDisable:
toDisable = append(toDisable, dreq)
case req := <-sh.schedule:
sh.SchedQueue.Push(req)
if sh.testSync != nil {
sh.testSync <- struct{}{}
}
case req := <-sh.windowRequests:
sh.OpenWindows = append(sh.OpenWindows, req)
default:
break loop
}
}
for _, req := range toDisable {
for _, window := range req.activeWindows {
for _, request := range window.Todo {
sh.SchedQueue.Push(request)
}
}
openWindows := make([]*SchedWindowRequest, 0, len(sh.OpenWindows))
for _, window := range sh.OpenWindows {
if window.Worker != req.wid {
openWindows = append(openWindows, window)
}
}
sh.OpenWindows = openWindows
sh.workersLk.Lock()
sh.Workers[req.wid].Enabled = false
sh.workersLk.Unlock()
req.done()
}
sh.trySched()
}
2020-03-23 11:40:02 +00:00
}
}
func (sh *Scheduler) diag() SchedDiagInfo {
var out SchedDiagInfo
for sqi := 0; sqi < sh.SchedQueue.Len(); sqi++ {
task := (*sh.SchedQueue)[sqi]
out.Requests = append(out.Requests, SchedDiagRequestInfo{
Sector: task.Sector.ID,
TaskType: task.TaskType,
Priority: task.Priority,
})
}
sh.workersLk.RLock()
defer sh.workersLk.RUnlock()
for _, window := range sh.OpenWindows {
out.OpenWindows = append(out.OpenWindows, uuid.UUID(window.Worker).String())
}
return out
}
type Assigner interface {
TrySched(sh *Scheduler)
}
func (sh *Scheduler) trySched() {
2020-08-03 12:18:11 +00:00
sh.workersLk.RLock()
defer sh.workersLk.RUnlock()
2020-09-25 14:41:29 +00:00
sh.assigner.TrySched(sh)
2020-07-09 10:58:52 +00:00
}
func (sh *Scheduler) schedClose() {
sh.workersLk.Lock()
defer sh.workersLk.Unlock()
2020-07-17 10:59:12 +00:00
log.Debugf("closing scheduler")
2020-03-24 23:49:45 +00:00
for i, w := range sh.Workers {
2020-07-17 10:59:12 +00:00
sh.workerCleanup(i, w)
2020-03-24 23:49:45 +00:00
}
}
func (sh *Scheduler) Info(ctx context.Context) (interface{}, error) {
ch := make(chan interface{}, 1)
sh.info <- func(res interface{}) {
ch <- res
}
select {
2020-07-27 11:21:36 +00:00
case res := <-ch:
return res, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (sh *Scheduler) Close(ctx context.Context) error {
close(sh.closing)
2020-07-17 10:59:12 +00:00
select {
case <-sh.closed:
case <-ctx.Done():
return ctx.Err()
}
return nil
}