2020-03-23 11:40:02 +00:00
package sectorstorage
import (
2020-05-07 23:38:05 +00:00
"container/heap"
2020-04-27 18:37:31 +00:00
"context"
"sort"
"sync"
"github.com/hashicorp/go-multierror"
2020-03-23 11:40:02 +00:00
"golang.org/x/xerrors"
2020-03-27 20:08:06 +00:00
"github.com/filecoin-project/specs-actors/actors/abi"
2020-03-27 23:21:36 +00:00
"github.com/filecoin-project/sector-storage/sealtasks"
2020-04-23 22:16:21 +00:00
"github.com/filecoin-project/sector-storage/storiface"
2020-03-23 11:40:02 +00:00
)
const mib = 1 << 20
2020-04-27 18:37:31 +00:00
type WorkerAction func ( ctx context . Context , w Worker ) error
type WorkerSelector interface {
2020-06-15 12:32:17 +00:00
Ok ( ctx context . Context , task sealtasks . TaskType , spt abi . RegisteredSealProof , a * workerHandle ) ( bool , error ) // true if worker is acceptable for performing a task
2020-04-27 18:37:31 +00:00
Cmp ( ctx context . Context , task sealtasks . TaskType , a , b * workerHandle ) ( bool , error ) // true if a is preferred over b
}
type scheduler struct {
2020-06-15 12:32:17 +00:00
spt abi . RegisteredSealProof
2020-04-27 18:37:31 +00:00
workersLk sync . Mutex
nextWorker WorkerID
workers map [ WorkerID ] * workerHandle
newWorkers chan * workerHandle
2020-05-01 18:00:17 +00:00
watchClosing chan WorkerID
workerClosing chan WorkerID
2020-04-27 18:37:31 +00:00
schedule chan * workerRequest
workerFree chan WorkerID
closing chan struct { }
2020-05-07 23:38:05 +00:00
schedQueue * requestQueue
2020-04-27 18:37:31 +00:00
}
2020-06-15 12:32:17 +00:00
func newScheduler ( spt abi . RegisteredSealProof ) * scheduler {
2020-04-27 18:37:31 +00:00
return & scheduler {
spt : spt ,
nextWorker : 0 ,
workers : map [ WorkerID ] * workerHandle { } ,
2020-05-01 18:04:21 +00:00
newWorkers : make ( chan * workerHandle ) ,
2020-05-01 18:00:17 +00:00
watchClosing : make ( chan WorkerID ) ,
workerClosing : make ( chan WorkerID ) ,
2020-05-01 18:04:21 +00:00
schedule : make ( chan * workerRequest ) ,
workerFree : make ( chan WorkerID ) ,
closing : make ( chan struct { } ) ,
2020-04-27 18:37:31 +00:00
2020-05-07 23:38:05 +00:00
schedQueue : & requestQueue { } ,
2020-04-27 18:37:31 +00:00
}
}
2020-05-13 23:56:21 +00:00
func ( sh * scheduler ) Schedule ( ctx context . Context , sector abi . SectorID , taskType sealtasks . TaskType , sel WorkerSelector , prepare WorkerAction , work WorkerAction ) error {
2020-04-27 18:37:31 +00:00
ret := make ( chan workerResponse )
select {
case sh . schedule <- & workerRequest {
2020-05-13 23:56:21 +00:00
sector : sector ,
2020-04-27 18:37:31 +00:00
taskType : taskType ,
sel : sel ,
prepare : prepare ,
work : work ,
ret : ret ,
ctx : ctx ,
} :
case <- sh . closing :
return xerrors . New ( "closing" )
case <- ctx . Done ( ) :
return ctx . Err ( )
}
select {
case resp := <- ret :
return resp . err
case <- sh . closing :
return xerrors . New ( "closing" )
case <- ctx . Done ( ) :
return ctx . Err ( )
}
}
2020-03-23 11:40:02 +00:00
type workerRequest struct {
2020-05-13 23:56:21 +00:00
sector abi . SectorID
2020-03-23 11:40:02 +00:00
taskType sealtasks . TaskType
2020-04-27 18:37:31 +00:00
sel WorkerSelector
prepare WorkerAction
work WorkerAction
2020-03-23 11:40:02 +00:00
2020-05-07 23:38:05 +00:00
index int // The index of the item in the heap.
2020-04-27 18:37:31 +00:00
ret chan <- workerResponse
ctx context . Context
2020-03-23 11:40:02 +00:00
}
type workerResponse struct {
err error
}
2020-04-27 18:37:31 +00:00
func ( r * workerRequest ) respond ( err error ) {
2020-03-23 11:40:02 +00:00
select {
2020-04-27 18:37:31 +00:00
case r . ret <- workerResponse { err : err } :
case <- r . ctx . Done ( ) :
2020-03-23 11:40:02 +00:00
log . Warnf ( "request got cancelled before we could respond" )
}
}
2020-04-27 20:43:42 +00:00
type activeResources struct {
memUsedMin uint64
memUsedMax uint64
gpuUsed bool
cpuUse uint64
cond * sync . Cond
}
2020-03-23 11:40:02 +00:00
type workerHandle struct {
w Worker
2020-04-23 22:16:21 +00:00
info storiface . WorkerInfo
2020-03-23 11:40:02 +00:00
2020-04-27 20:43:42 +00:00
preparing * activeResources
active * activeResources
2020-03-23 11:40:02 +00:00
}
2020-04-27 18:37:31 +00:00
func ( sh * scheduler ) runSched ( ) {
2020-05-01 18:00:17 +00:00
go sh . runWorkerWatcher ( )
2020-03-23 11:40:02 +00:00
for {
select {
2020-04-27 18:37:31 +00:00
case w := <- sh . newWorkers :
2020-05-01 18:00:17 +00:00
sh . schedNewWorker ( w )
case wid := <- sh . workerClosing :
sh . schedDropWorker ( wid )
2020-04-27 18:37:31 +00:00
case req := <- sh . schedule :
scheduled , err := sh . maybeSchedRequest ( req )
2020-03-23 11:40:02 +00:00
if err != nil {
2020-04-27 18:37:31 +00:00
req . respond ( err )
2020-03-23 11:40:02 +00:00
continue
}
2020-04-27 18:37:31 +00:00
if scheduled {
2020-03-23 11:40:02 +00:00
continue
}
2020-05-07 23:38:05 +00:00
heap . Push ( sh . schedQueue , req )
2020-04-27 18:37:31 +00:00
case wid := <- sh . workerFree :
sh . onWorkerFreed ( wid )
case <- sh . closing :
sh . schedClose ( )
2020-03-24 23:49:45 +00:00
return
2020-03-23 11:40:02 +00:00
}
}
}
2020-04-27 18:37:31 +00:00
func ( sh * scheduler ) onWorkerFreed ( wid WorkerID ) {
2020-05-01 18:00:17 +00:00
sh . workersLk . Lock ( )
w , ok := sh . workers [ wid ]
sh . workersLk . Unlock ( )
if ! ok {
log . Warnf ( "onWorkerFreed on invalid worker %d" , wid )
return
}
2020-05-07 23:38:05 +00:00
for i := 0 ; i < sh . schedQueue . Len ( ) ; i ++ {
req := ( * sh . schedQueue ) [ i ]
2020-04-27 18:37:31 +00:00
2020-05-08 16:54:06 +00:00
ok , err := req . sel . Ok ( req . ctx , req . taskType , sh . spt , w )
2020-04-27 18:37:31 +00:00
if err != nil {
log . Errorf ( "onWorkerFreed req.sel.Ok error: %+v" , err )
continue
2020-03-23 11:40:02 +00:00
}
2020-04-27 18:37:31 +00:00
2020-03-23 11:40:02 +00:00
if ! ok {
continue
}
2020-04-27 18:37:31 +00:00
scheduled , err := sh . maybeSchedRequest ( req )
2020-03-23 11:40:02 +00:00
if err != nil {
2020-04-27 18:37:31 +00:00
req . respond ( err )
2020-03-23 11:40:02 +00:00
continue
}
2020-04-27 18:37:31 +00:00
if scheduled {
2020-05-07 23:38:05 +00:00
heap . Remove ( sh . schedQueue , i )
i --
2020-03-23 11:40:02 +00:00
continue
}
}
}
2020-04-27 18:37:31 +00:00
func ( sh * scheduler ) maybeSchedRequest ( req * workerRequest ) ( bool , error ) {
sh . workersLk . Lock ( )
defer sh . workersLk . Unlock ( )
2020-03-23 11:40:02 +00:00
tried := 0
2020-04-27 18:37:31 +00:00
var acceptable [ ] WorkerID
2020-04-29 14:04:05 +00:00
needRes := ResourceTable [ req . taskType ] [ sh . spt ]
2020-04-27 18:37:31 +00:00
for wid , worker := range sh . workers {
2020-05-08 16:54:06 +00:00
ok , err := req . sel . Ok ( req . ctx , req . taskType , sh . spt , worker )
2020-04-27 18:37:31 +00:00
if err != nil {
return false , err
}
2020-03-23 11:40:02 +00:00
if ! ok {
2020-04-27 18:37:31 +00:00
continue
2020-03-23 11:40:02 +00:00
}
tried ++
2020-04-27 20:43:42 +00:00
if ! canHandleRequest ( needRes , sh . spt , wid , worker . info . Resources , worker . preparing ) {
2020-03-23 11:40:02 +00:00
continue
}
2020-04-27 18:37:31 +00:00
acceptable = append ( acceptable , wid )
}
if len ( acceptable ) > 0 {
{
var serr error
sort . SliceStable ( acceptable , func ( i , j int ) bool {
r , err := req . sel . Cmp ( req . ctx , req . taskType , sh . workers [ acceptable [ i ] ] , sh . workers [ acceptable [ j ] ] )
if err != nil {
serr = multierror . Append ( serr , err )
}
return r
} )
if serr != nil {
return false , xerrors . Errorf ( "error(s) selecting best worker: %w" , serr )
}
}
return true , sh . assignWorker ( acceptable [ 0 ] , sh . workers [ acceptable [ 0 ] ] , req )
2020-03-23 11:40:02 +00:00
}
if tried == 0 {
2020-04-27 18:37:31 +00:00
return false , xerrors . New ( "maybeSchedRequest didn't find any good workers" )
2020-03-23 11:40:02 +00:00
}
2020-04-27 18:37:31 +00:00
return false , nil // put in waiting queue
2020-03-23 11:40:02 +00:00
}
2020-04-27 18:37:31 +00:00
func ( sh * scheduler ) assignWorker ( wid WorkerID , w * workerHandle , req * workerRequest ) error {
needRes := ResourceTable [ req . taskType ] [ sh . spt ]
2020-03-23 11:40:02 +00:00
2020-04-27 20:43:42 +00:00
w . preparing . add ( w . info . Resources , needRes )
2020-03-23 11:40:02 +00:00
2020-04-27 18:37:31 +00:00
go func ( ) {
2020-04-27 20:43:42 +00:00
err := req . prepare ( req . ctx , w . w )
sh . workersLk . Lock ( )
2020-04-27 20:59:17 +00:00
if err != nil {
w . preparing . free ( w . info . Resources , needRes )
sh . workersLk . Unlock ( )
select {
case sh . workerFree <- wid :
case <- sh . closing :
log . Warnf ( "scheduler closed while sending response (prepare error: %+v)" , err )
}
select {
case req . ret <- workerResponse { err : err } :
case <- req . ctx . Done ( ) :
log . Warnf ( "request got cancelled before we could respond (prepare error: %+v)" , err )
case <- sh . closing :
log . Warnf ( "scheduler closed while sending response (prepare error: %+v)" , err )
}
return
}
2020-04-27 20:43:42 +00:00
err = w . active . withResources ( sh . spt , wid , w . info . Resources , needRes , & sh . workersLk , func ( ) error {
w . preparing . free ( w . info . Resources , needRes )
2020-04-27 18:37:31 +00:00
sh . workersLk . Unlock ( )
2020-04-27 20:43:42 +00:00
defer sh . workersLk . Lock ( ) // we MUST return locked from this function
2020-03-23 11:40:02 +00:00
select {
2020-04-27 18:37:31 +00:00
case sh . workerFree <- wid :
case <- sh . closing :
2020-03-23 11:40:02 +00:00
}
2020-04-27 18:37:31 +00:00
err = req . work ( req . ctx , w . w )
2020-04-27 20:43:42 +00:00
select {
case req . ret <- workerResponse { err : err } :
case <- req . ctx . Done ( ) :
log . Warnf ( "request got cancelled before we could respond" )
case <- sh . closing :
log . Warnf ( "scheduler closed while sending response" )
}
return nil
} )
sh . workersLk . Unlock ( )
2020-04-28 10:31:08 +00:00
// This error should always be nil, since nothing is setting it, but just to be safe:
if err != nil {
log . Errorf ( "error executing worker (withResources): %+v" , err )
}
2020-04-27 18:37:31 +00:00
} ( )
return nil
2020-03-23 11:40:02 +00:00
}
2020-06-15 12:32:17 +00:00
func ( a * activeResources ) withResources ( spt abi . RegisteredSealProof , id WorkerID , wr storiface . WorkerResources , r Resources , locker sync . Locker , cb func ( ) error ) error {
2020-04-27 20:43:42 +00:00
for ! canHandleRequest ( r , spt , id , wr , a ) {
if a . cond == nil {
a . cond = sync . NewCond ( locker )
}
a . cond . Wait ( )
}
a . add ( wr , r )
err := cb ( )
a . free ( wr , r )
if a . cond != nil {
a . cond . Broadcast ( )
2020-03-23 11:40:02 +00:00
}
2020-04-27 20:43:42 +00:00
return err
}
func ( a * activeResources ) add ( wr storiface . WorkerResources , r Resources ) {
a . gpuUsed = r . CanGPU
if r . MultiThread ( ) {
a . cpuUse += wr . CPUs
} else {
a . cpuUse += uint64 ( r . Threads )
}
a . memUsedMin += r . MinMemory
a . memUsedMax += r . MaxMemory
}
func ( a * activeResources ) free ( wr storiface . WorkerResources , r Resources ) {
if r . CanGPU {
a . gpuUsed = false
}
if r . MultiThread ( ) {
a . cpuUse -= wr . CPUs
} else {
a . cpuUse -= uint64 ( r . Threads )
}
a . memUsedMin -= r . MinMemory
a . memUsedMax -= r . MaxMemory
}
2020-06-15 12:32:17 +00:00
func canHandleRequest ( needRes Resources , spt abi . RegisteredSealProof , wid WorkerID , res storiface . WorkerResources , active * activeResources ) bool {
2020-03-23 11:40:02 +00:00
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
2020-04-27 20:43:42 +00:00
minNeedMem := res . MemReserved + active . memUsedMin + needRes . MinMemory + needRes . BaseMinMemory
2020-03-23 11:40:02 +00:00
if minNeedMem > res . MemPhysical {
log . Debugf ( "sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM" , wid , minNeedMem / mib , res . MemPhysical / mib )
2020-04-27 20:43:42 +00:00
return false
2020-03-23 11:40:02 +00:00
}
2020-04-27 20:43:42 +00:00
maxNeedMem := res . MemReserved + active . memUsedMax + needRes . MaxMemory + needRes . BaseMinMemory
2020-06-15 12:32:17 +00:00
if spt == abi . RegisteredSealProof_StackedDrg32GiBV1 {
2020-03-23 11:40:02 +00:00
maxNeedMem += MaxCachingOverhead
}
2020-06-15 12:32:17 +00:00
if spt == abi . RegisteredSealProof_StackedDrg64GiBV1 {
2020-05-08 20:32:34 +00:00
maxNeedMem += MaxCachingOverhead * 2 // ewwrhmwh
}
2020-03-23 11:40:02 +00:00
if maxNeedMem > res . MemSwap + res . MemPhysical {
log . Debugf ( "sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM" , wid , maxNeedMem / mib , ( res . MemSwap + res . MemPhysical ) / mib )
2020-04-27 20:43:42 +00:00
return false
2020-03-23 11:40:02 +00:00
}
2020-04-27 18:37:31 +00:00
if needRes . MultiThread ( ) {
2020-04-27 20:43:42 +00:00
if active . cpuUse > 0 {
log . Debugf ( "sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d" , wid , res . CPUs , active . cpuUse , res . CPUs )
return false
2020-03-23 11:40:02 +00:00
}
2020-04-29 14:56:20 +00:00
} else {
2020-05-01 18:04:21 +00:00
if active . cpuUse + uint64 ( needRes . Threads ) > res . CPUs {
2020-04-29 14:56:20 +00:00
log . Debugf ( "sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d" , wid , needRes . Threads , active . cpuUse , res . CPUs )
return false
}
2020-03-23 11:40:02 +00:00
}
if len ( res . GPUs ) > 0 && needRes . CanGPU {
2020-04-27 20:43:42 +00:00
if active . gpuUsed {
2020-03-23 11:40:02 +00:00
log . Debugf ( "sched: not scheduling on worker %d; GPU in use" , wid )
2020-04-27 20:43:42 +00:00
return false
2020-03-23 11:40:02 +00:00
}
}
2020-04-27 20:43:42 +00:00
return true
2020-03-23 11:40:02 +00:00
}
2020-04-29 14:04:05 +00:00
func ( a * activeResources ) utilization ( wr storiface . WorkerResources ) float64 {
var max float64
cpu := float64 ( a . cpuUse ) / float64 ( wr . CPUs )
max = cpu
2020-05-01 18:04:21 +00:00
memMin := float64 ( a . memUsedMin + wr . MemReserved ) / float64 ( wr . MemPhysical )
2020-04-29 14:04:05 +00:00
if memMin > max {
max = memMin
}
2020-05-01 18:04:21 +00:00
memMax := float64 ( a . memUsedMax + wr . MemReserved ) / float64 ( wr . MemPhysical + wr . MemSwap )
2020-04-29 14:04:05 +00:00
if memMax > max {
max = memMax
}
return max
}
2020-05-01 18:00:17 +00:00
func ( sh * scheduler ) schedNewWorker ( w * workerHandle ) {
2020-04-27 18:37:31 +00:00
sh . workersLk . Lock ( )
2020-03-23 11:40:02 +00:00
2020-04-27 18:37:31 +00:00
id := sh . nextWorker
sh . workers [ id ] = w
sh . nextWorker ++
2020-05-01 15:29:27 +00:00
2020-05-01 18:00:17 +00:00
sh . workersLk . Unlock ( )
select {
case sh . watchClosing <- id :
case <- sh . closing :
return
}
sh . onWorkerFreed ( id )
}
func ( sh * scheduler ) schedDropWorker ( wid WorkerID ) {
sh . workersLk . Lock ( )
defer sh . workersLk . Unlock ( )
w := sh . workers [ wid ]
delete ( sh . workers , wid )
go func ( ) {
if err := w . w . Close ( ) ; err != nil {
log . Warnf ( "closing worker %d: %+v" , err )
}
} ( )
2020-03-23 11:40:02 +00:00
}
2020-03-24 23:49:45 +00:00
2020-04-27 18:37:31 +00:00
func ( sh * scheduler ) schedClose ( ) {
sh . workersLk . Lock ( )
defer sh . workersLk . Unlock ( )
2020-03-24 23:49:45 +00:00
2020-04-27 18:37:31 +00:00
for i , w := range sh . workers {
2020-03-24 23:49:45 +00:00
if err := w . w . Close ( ) ; err != nil {
log . Errorf ( "closing worker %d: %+v" , i , err )
}
}
}
2020-04-27 18:37:31 +00:00
func ( sh * scheduler ) Close ( ) error {
close ( sh . closing )
return nil
}