2020-08-17 13:26:18 +00:00
package sectorstorage
2020-03-23 11:40:02 +00:00
import (
2020-04-27 18:37:31 +00:00
"context"
"sync"
2020-06-23 09:42:47 +00:00
"time"
2020-04-27 18:37:31 +00:00
2020-10-30 17:32:16 +00:00
"github.com/google/uuid"
2020-03-23 11:40:02 +00:00
"golang.org/x/xerrors"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2020-11-04 20:29:08 +00:00
"github.com/filecoin-project/specs-storage/storage"
2020-03-27 20:08:06 +00:00
2020-08-17 13:26:18 +00:00
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
2020-03-23 11:40:02 +00:00
)
2020-06-24 21:06:56 +00:00
type schedPrioCtxKey int
var SchedPriorityKey schedPrioCtxKey
var DefaultSchedPriority = 0
2020-07-09 10:58:52 +00:00
var SelectorTimeout = 5 * time . Second
2020-08-27 21:58:37 +00:00
var InitWait = 3 * time . Second
2020-07-09 10:58:52 +00:00
var (
SchedWindows = 2
)
2020-06-24 21:06:56 +00:00
func getPriority ( ctx context . Context ) int {
sp := ctx . Value ( SchedPriorityKey )
if p , ok := sp . ( int ) ; ok {
return p
}
return DefaultSchedPriority
}
func WithPriority ( ctx context . Context , priority int ) context . Context {
return context . WithValue ( ctx , SchedPriorityKey , priority )
}
2020-03-23 11:40:02 +00:00
const mib = 1 << 20
2020-04-27 18:37:31 +00:00
type WorkerAction func ( ctx context . Context , w Worker ) error
type WorkerSelector interface {
2022-05-18 13:47:08 +00:00
Ok ( ctx context . Context , task sealtasks . TaskType , spt abi . RegisteredSealProof , a * WorkerHandle ) ( bool , error ) // true if worker is acceptable for performing a task
2020-04-27 18:37:31 +00:00
2022-05-18 13:47:08 +00:00
Cmp ( ctx context . Context , task sealtasks . TaskType , a , b * WorkerHandle ) ( bool , error ) // true if a is preferred over b
2020-04-27 18:37:31 +00:00
}
2022-05-18 13:47:08 +00:00
type Scheduler struct {
assigner Assigner
2020-10-18 10:35:44 +00:00
workersLk sync . RWMutex
2020-05-01 18:00:17 +00:00
2022-05-18 13:47:08 +00:00
Workers map [ storiface . WorkerID ] * WorkerHandle
schedule chan * WorkerRequest
windowRequests chan * SchedWindowRequest
2020-10-18 10:35:44 +00:00
workerChange chan struct { } // worker added / changed/freed resources
workerDisable chan workerDisableReq
2020-07-09 10:58:52 +00:00
// owned by the sh.runSched goroutine
2022-05-18 13:47:08 +00:00
SchedQueue * RequestQueue
OpenWindows [ ] * SchedWindowRequest
2020-07-09 10:58:52 +00:00
2020-10-28 13:23:38 +00:00
workTracker * workTracker
2020-09-23 12:56:37 +00:00
2020-07-27 10:17:09 +00:00
info chan func ( interface { } )
2020-07-16 23:32:49 +00:00
closing chan struct { }
2020-07-17 10:59:12 +00:00
closed chan struct { }
2020-07-16 23:26:55 +00:00
testSync chan struct { } // used for testing
2020-07-09 10:58:52 +00:00
}
2022-05-18 13:47:08 +00:00
type WorkerHandle struct {
2020-10-28 13:23:38 +00:00
workerRpc Worker
2020-07-09 10:58:52 +00:00
2022-04-06 16:31:42 +00:00
tasksCache map [ sealtasks . TaskType ] struct { }
tasksUpdate time . Time
tasksLk sync . Mutex
2022-05-18 13:47:08 +00:00
Info storiface . WorkerInfo
2020-07-09 10:58:52 +00:00
2022-05-18 13:47:08 +00:00
preparing * activeResources // use with WorkerHandle.lk
active * activeResources // use with WorkerHandle.lk
2020-07-17 10:59:12 +00:00
2021-09-15 13:34:50 +00:00
lk sync . Mutex // can be taken inside sched.workersLk.RLock
2020-08-03 12:18:11 +00:00
2021-09-15 13:34:50 +00:00
wndLk sync . Mutex // can be taken inside sched.workersLk.RLock
2022-05-18 13:47:08 +00:00
activeWindows [ ] * SchedWindow
2020-08-27 21:14:33 +00:00
2022-05-18 13:47:08 +00:00
Enabled bool
2020-07-21 18:01:25 +00:00
2020-07-17 10:59:12 +00:00
// for sync manager goroutine closing
cleanupStarted bool
closedMgr chan struct { }
closingMgr chan struct { }
2020-07-09 10:58:52 +00:00
}
2022-05-18 13:47:08 +00:00
type SchedWindowRequest struct {
Worker storiface . WorkerID
2020-07-09 10:58:52 +00:00
2022-05-18 13:47:08 +00:00
Done chan * SchedWindow
2020-07-09 10:58:52 +00:00
}
2022-05-18 13:47:08 +00:00
type SchedWindow struct {
Allocated activeResources
Todo [ ] * WorkerRequest
2020-07-09 10:58:52 +00:00
}
2020-10-18 10:35:44 +00:00
type workerDisableReq struct {
2022-05-18 13:47:08 +00:00
activeWindows [ ] * SchedWindow
2021-11-29 13:42:20 +00:00
wid storiface . WorkerID
2020-10-18 10:35:44 +00:00
done func ( )
}
2020-07-09 10:58:52 +00:00
type activeResources struct {
memUsedMin uint64
memUsedMax uint64
2021-09-01 01:59:25 +00:00
gpuUsed float64
2020-07-09 10:58:52 +00:00
cpuUse uint64
2021-09-15 14:37:27 +00:00
cond * sync . Cond
waiting int
2020-07-09 10:58:52 +00:00
}
2022-05-18 13:47:08 +00:00
type WorkerRequest struct {
Sector storage . SectorRef
TaskType sealtasks . TaskType
Priority int // larger values more important
Sel WorkerSelector
2020-07-09 10:58:52 +00:00
prepare WorkerAction
work WorkerAction
2020-08-27 21:14:33 +00:00
start time . Time
2020-07-09 10:58:52 +00:00
index int // The index of the item in the heap.
2022-05-18 13:47:08 +00:00
IndexHeap int
2020-07-27 11:21:36 +00:00
ret chan <- workerResponse
2022-05-18 13:47:08 +00:00
Ctx context . Context
2020-07-09 10:58:52 +00:00
}
2020-04-27 18:37:31 +00:00
2020-07-09 10:58:52 +00:00
type workerResponse struct {
err error
2020-04-27 18:37:31 +00:00
}
2022-05-23 14:58:43 +00:00
func newScheduler ( assigner string ) ( * Scheduler , error ) {
var a Assigner
switch assigner {
case "" , "utilization" :
a = NewLowestUtilizationAssigner ( )
case "spread" :
a = NewSpreadAssigner ( )
default :
return nil , xerrors . Errorf ( "unknown assigner '%s'" , assigner )
}
2022-05-18 13:47:08 +00:00
return & Scheduler {
2022-05-23 14:58:43 +00:00
assigner : a ,
2020-05-01 18:00:17 +00:00
2022-05-18 13:47:08 +00:00
Workers : map [ storiface . WorkerID ] * WorkerHandle { } ,
schedule : make ( chan * WorkerRequest ) ,
windowRequests : make ( chan * SchedWindowRequest , 20 ) ,
2020-10-18 10:35:44 +00:00
workerChange : make ( chan struct { } , 20 ) ,
workerDisable : make ( chan workerDisableReq ) ,
2020-04-27 18:37:31 +00:00
2022-05-18 13:47:08 +00:00
SchedQueue : & RequestQueue { } ,
2020-07-09 12:40:53 +00:00
2020-10-28 13:23:38 +00:00
workTracker : & workTracker {
2021-10-15 19:04:03 +00:00
done : map [ storiface . CallID ] struct { } { } ,
running : map [ storiface . CallID ] trackedWork { } ,
2021-10-18 14:27:25 +00:00
prepared : map [ uuid . UUID ] trackedWork { } ,
2020-09-23 12:56:37 +00:00
} ,
2020-07-27 10:17:09 +00:00
info : make ( chan func ( interface { } ) ) ,
2020-07-09 12:40:53 +00:00
closing : make ( chan struct { } ) ,
2020-07-17 10:59:12 +00:00
closed : make ( chan struct { } ) ,
2022-05-23 14:58:43 +00:00
} , nil
2020-04-27 18:37:31 +00:00
}
2022-05-18 13:47:08 +00:00
func ( sh * Scheduler ) Schedule ( ctx context . Context , sector storage . SectorRef , taskType sealtasks . TaskType , sel WorkerSelector , prepare WorkerAction , work WorkerAction ) error {
2020-04-27 18:37:31 +00:00
ret := make ( chan workerResponse )
select {
2022-05-18 13:47:08 +00:00
case sh . schedule <- & WorkerRequest {
Sector : sector ,
TaskType : taskType ,
Priority : getPriority ( ctx ) ,
Sel : sel ,
2020-04-27 18:37:31 +00:00
prepare : prepare ,
work : work ,
2020-08-27 21:14:33 +00:00
start : time . Now ( ) ,
2020-04-27 18:37:31 +00:00
ret : ret ,
2022-05-18 13:47:08 +00:00
Ctx : ctx ,
2020-04-27 18:37:31 +00:00
} :
case <- sh . closing :
return xerrors . New ( "closing" )
case <- ctx . Done ( ) :
return ctx . Err ( )
}
select {
case resp := <- ret :
return resp . err
case <- sh . closing :
return xerrors . New ( "closing" )
case <- ctx . Done ( ) :
return ctx . Err ( )
}
}
2022-05-18 13:47:08 +00:00
func ( r * WorkerRequest ) respond ( err error ) {
2020-03-23 11:40:02 +00:00
select {
2020-04-27 18:37:31 +00:00
case r . ret <- workerResponse { err : err } :
2022-05-18 13:47:08 +00:00
case <- r . Ctx . Done ( ) :
2020-03-23 11:40:02 +00:00
log . Warnf ( "request got cancelled before we could respond" )
}
}
2020-07-27 10:17:09 +00:00
type SchedDiagRequestInfo struct {
Sector abi . SectorID
TaskType sealtasks . TaskType
Priority int
}
type SchedDiagInfo struct {
2020-07-27 11:21:36 +00:00
Requests [ ] SchedDiagRequestInfo
2020-10-30 17:32:16 +00:00
OpenWindows [ ] string
2020-07-27 10:17:09 +00:00
}
2022-05-18 13:47:08 +00:00
func ( sh * Scheduler ) runSched ( ) {
2020-07-17 10:59:12 +00:00
defer close ( sh . closed )
2020-08-27 21:58:37 +00:00
iw := time . After ( InitWait )
var initialised bool
2020-03-23 11:40:02 +00:00
for {
2020-08-27 22:03:42 +00:00
var doSched bool
2020-10-18 10:35:44 +00:00
var toDisable [ ] workerDisableReq
2020-08-27 22:03:42 +00:00
2020-03-23 11:40:02 +00:00
select {
2020-10-18 10:35:44 +00:00
case <- sh . workerChange :
doSched = true
case dreq := <- sh . workerDisable :
toDisable = append ( toDisable , dreq )
doSched = true
2020-07-09 10:58:52 +00:00
case req := <- sh . schedule :
2022-05-18 13:47:08 +00:00
sh . SchedQueue . Push ( req )
2020-08-27 22:03:42 +00:00
doSched = true
2020-07-09 10:58:52 +00:00
2020-07-16 23:26:55 +00:00
if sh . testSync != nil {
sh . testSync <- struct { } { }
}
2020-07-09 10:58:52 +00:00
case req := <- sh . windowRequests :
2022-05-18 13:47:08 +00:00
sh . OpenWindows = append ( sh . OpenWindows , req )
2020-08-27 22:03:42 +00:00
doSched = true
2020-07-27 10:17:09 +00:00
case ireq := <- sh . info :
ireq ( sh . diag ( ) )
2020-08-27 21:58:37 +00:00
case <- iw :
initialised = true
iw = nil
2020-08-27 22:03:42 +00:00
doSched = true
2020-04-27 18:37:31 +00:00
case <- sh . closing :
sh . schedClose ( )
2020-03-24 23:49:45 +00:00
return
2020-03-23 11:40:02 +00:00
}
2020-08-27 22:03:42 +00:00
if doSched && initialised {
// First gather any pending tasks, so we go through the scheduling loop
// once for every added task
loop :
for {
select {
2020-10-18 10:35:44 +00:00
case <- sh . workerChange :
case dreq := <- sh . workerDisable :
toDisable = append ( toDisable , dreq )
2020-08-27 22:03:42 +00:00
case req := <- sh . schedule :
2022-05-18 13:47:08 +00:00
sh . SchedQueue . Push ( req )
2020-08-27 22:03:42 +00:00
if sh . testSync != nil {
sh . testSync <- struct { } { }
}
case req := <- sh . windowRequests :
2022-05-18 13:47:08 +00:00
sh . OpenWindows = append ( sh . OpenWindows , req )
2020-08-27 22:03:42 +00:00
default :
break loop
}
}
2020-10-18 10:35:44 +00:00
for _ , req := range toDisable {
for _ , window := range req . activeWindows {
2022-05-18 13:47:08 +00:00
for _ , request := range window . Todo {
sh . SchedQueue . Push ( request )
2020-10-18 10:35:44 +00:00
}
}
2022-05-18 13:47:08 +00:00
openWindows := make ( [ ] * SchedWindowRequest , 0 , len ( sh . OpenWindows ) )
for _ , window := range sh . OpenWindows {
if window . Worker != req . wid {
2020-10-18 10:35:44 +00:00
openWindows = append ( openWindows , window )
}
}
2022-05-18 13:47:08 +00:00
sh . OpenWindows = openWindows
2020-10-18 10:35:44 +00:00
sh . workersLk . Lock ( )
2022-05-18 13:47:08 +00:00
sh . Workers [ req . wid ] . Enabled = false
2020-10-18 10:35:44 +00:00
sh . workersLk . Unlock ( )
req . done ( )
}
2020-08-27 22:03:42 +00:00
sh . trySched ( )
}
2020-03-23 11:40:02 +00:00
}
}
2022-05-18 13:47:08 +00:00
func ( sh * Scheduler ) diag ( ) SchedDiagInfo {
2020-07-27 10:17:09 +00:00
var out SchedDiagInfo
2022-05-18 13:47:08 +00:00
for sqi := 0 ; sqi < sh . SchedQueue . Len ( ) ; sqi ++ {
task := ( * sh . SchedQueue ) [ sqi ]
2020-07-27 10:17:09 +00:00
out . Requests = append ( out . Requests , SchedDiagRequestInfo {
2022-05-18 13:47:08 +00:00
Sector : task . Sector . ID ,
TaskType : task . TaskType ,
Priority : task . Priority ,
2020-07-27 10:17:09 +00:00
} )
}
2020-10-18 10:35:44 +00:00
sh . workersLk . RLock ( )
defer sh . workersLk . RUnlock ( )
2022-05-18 13:47:08 +00:00
for _ , window := range sh . OpenWindows {
out . OpenWindows = append ( out . OpenWindows , uuid . UUID ( window . Worker ) . String ( ) )
2020-07-27 10:17:09 +00:00
}
return out
}
2022-05-18 13:47:08 +00:00
type Assigner interface {
TrySched ( sh * Scheduler )
}
2020-04-27 18:37:31 +00:00
2022-05-18 13:47:08 +00:00
func ( sh * Scheduler ) trySched ( ) {
2020-08-03 12:18:11 +00:00
sh . workersLk . RLock ( )
defer sh . workersLk . RUnlock ( )
2020-09-25 14:41:29 +00:00
2022-05-18 13:47:08 +00:00
sh . assigner . TrySched ( sh )
2020-07-09 10:58:52 +00:00
}
2020-04-27 20:43:42 +00:00
2022-05-18 13:47:08 +00:00
func ( sh * Scheduler ) schedClose ( ) {
2020-04-27 18:37:31 +00:00
sh . workersLk . Lock ( )
defer sh . workersLk . Unlock ( )
2020-07-17 10:59:12 +00:00
log . Debugf ( "closing scheduler" )
2020-03-24 23:49:45 +00:00
2022-05-18 13:47:08 +00:00
for i , w := range sh . Workers {
2020-07-17 10:59:12 +00:00
sh . workerCleanup ( i , w )
2020-03-24 23:49:45 +00:00
}
}
2020-04-27 18:37:31 +00:00
2022-05-18 13:47:08 +00:00
func ( sh * Scheduler ) Info ( ctx context . Context ) ( interface { } , error ) {
2020-07-27 10:17:09 +00:00
ch := make ( chan interface { } , 1 )
sh . info <- func ( res interface { } ) {
ch <- res
}
select {
2020-07-27 11:21:36 +00:00
case res := <- ch :
2020-07-27 10:17:09 +00:00
return res , nil
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
}
}
2022-05-18 13:47:08 +00:00
func ( sh * Scheduler ) Close ( ctx context . Context ) error {
2020-04-27 18:37:31 +00:00
close ( sh . closing )
2020-07-17 10:59:12 +00:00
select {
case <- sh . closed :
case <- ctx . Done ( ) :
return ctx . Err ( )
}
2020-04-27 18:37:31 +00:00
return nil
}