243 lines
5.1 KiB
Go
243 lines
5.1 KiB
Go
package sectorstorage
|
|
|
|
import (
|
|
"github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
|
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
)
|
|
|
|
const mib = 1 << 20
|
|
|
|
type workerRequest struct {
|
|
taskType sealtasks.TaskType
|
|
accept []workerID // ordered by preference
|
|
|
|
ret chan<- workerResponse
|
|
cancel <-chan struct{}
|
|
}
|
|
|
|
type workerResponse struct {
|
|
err error
|
|
|
|
worker Worker
|
|
done func()
|
|
}
|
|
|
|
func (r *workerRequest) respond(resp workerResponse) {
|
|
select {
|
|
case r.ret <- resp:
|
|
case <-r.cancel:
|
|
log.Warnf("request got cancelled before we could respond")
|
|
if resp.done != nil {
|
|
resp.done()
|
|
}
|
|
}
|
|
}
|
|
|
|
type workerHandle struct {
|
|
w Worker
|
|
|
|
info api.WorkerInfo
|
|
|
|
memUsedMin uint64
|
|
memUsedMax uint64
|
|
gpuUsed bool
|
|
cpuUse int // -1 - multicore thing; 0 - free; 1+ - singlecore things
|
|
}
|
|
|
|
func (m *Manager) runSched() {
|
|
for {
|
|
select {
|
|
case w := <-m.newWorkers:
|
|
m.schedNewWorker(w)
|
|
case req := <-m.schedule:
|
|
resp, err := m.maybeSchedRequest(req)
|
|
if err != nil {
|
|
req.respond(workerResponse{err: err})
|
|
continue
|
|
}
|
|
|
|
if resp != nil {
|
|
req.respond(*resp)
|
|
continue
|
|
}
|
|
|
|
m.schedQueue.PushBack(req)
|
|
case wid := <-m.workerFree:
|
|
m.onWorkerFreed(wid)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) onWorkerFreed(wid workerID) {
|
|
for e := m.schedQueue.Front(); e != nil; e = e.Next() {
|
|
req := e.Value.(*workerRequest)
|
|
var ok bool
|
|
for _, id := range req.accept {
|
|
if id == wid {
|
|
ok = true
|
|
break
|
|
}
|
|
}
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
resp, err := m.maybeSchedRequest(req)
|
|
if err != nil {
|
|
req.respond(workerResponse{err: err})
|
|
continue
|
|
}
|
|
|
|
if resp != nil {
|
|
req.respond(*resp)
|
|
|
|
pe := e.Prev()
|
|
m.schedQueue.Remove(e)
|
|
if pe == nil {
|
|
pe = m.schedQueue.Front()
|
|
}
|
|
if pe == nil {
|
|
break
|
|
}
|
|
e = pe
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) maybeSchedRequest(req *workerRequest) (*workerResponse, error) {
|
|
m.workersLk.Lock()
|
|
defer m.workersLk.Unlock()
|
|
|
|
tried := 0
|
|
|
|
for _, id := range req.accept {
|
|
w, ok := m.workers[id]
|
|
if !ok {
|
|
log.Warnf("requested worker %d is not in scheduler", id)
|
|
}
|
|
tried++
|
|
|
|
canDo, err := m.canHandleRequest(id, w, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !canDo {
|
|
continue
|
|
}
|
|
|
|
return m.makeResponse(id, w, req), nil
|
|
}
|
|
|
|
if tried == 0 {
|
|
return nil, xerrors.New("maybeSchedRequest didn't find any good workers")
|
|
}
|
|
|
|
return nil, nil // put in waiting queue
|
|
}
|
|
|
|
func (m *Manager) makeResponse(wid workerID, w *workerHandle, req *workerRequest) *workerResponse {
|
|
needRes := ResourceTable[req.taskType][m.scfg.SealProofType]
|
|
|
|
w.gpuUsed = needRes.CanGPU
|
|
if needRes.MultiThread {
|
|
w.cpuUse = -1
|
|
} else {
|
|
if w.cpuUse != -1 {
|
|
w.cpuUse++
|
|
} else {
|
|
log.Warnf("sched: makeResponse for worker %d: worker cpu is in multicore use, but a single core task was scheduled", wid)
|
|
}
|
|
}
|
|
|
|
w.memUsedMin += needRes.MinMemory
|
|
w.memUsedMax += needRes.MaxMemory
|
|
|
|
return &workerResponse{
|
|
err: nil,
|
|
worker: w.w,
|
|
done: func() {
|
|
m.workersLk.Lock()
|
|
|
|
if needRes.CanGPU {
|
|
w.gpuUsed = false
|
|
}
|
|
|
|
if needRes.MultiThread {
|
|
w.cpuUse = 0
|
|
} else if w.cpuUse != -1 {
|
|
w.cpuUse--
|
|
}
|
|
|
|
w.memUsedMin -= needRes.MinMemory
|
|
w.memUsedMax -= needRes.MaxMemory
|
|
|
|
m.workersLk.Unlock()
|
|
|
|
select {
|
|
case m.workerFree <- wid:
|
|
case <-m.closing:
|
|
}
|
|
},
|
|
}
|
|
}
|
|
|
|
func (m *Manager) canHandleRequest(wid workerID, w *workerHandle, req *workerRequest) (bool, error) {
|
|
needRes, ok := ResourceTable[req.taskType][m.scfg.SealProofType]
|
|
if !ok {
|
|
return false, xerrors.Errorf("canHandleRequest: missing ResourceTable entry for %s/%d", req.taskType, m.scfg.SealProofType)
|
|
}
|
|
|
|
res := w.info.Resources
|
|
|
|
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
|
|
minNeedMem := res.MemReserved + w.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
|
|
if minNeedMem > res.MemPhysical {
|
|
log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib)
|
|
return false, nil
|
|
}
|
|
|
|
maxNeedMem := res.MemReserved + w.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
|
|
if m.scfg.SealProofType == abi.RegisteredProof_StackedDRG32GiBSeal {
|
|
maxNeedMem += MaxCachingOverhead
|
|
}
|
|
if maxNeedMem > res.MemSwap+res.MemPhysical {
|
|
log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
|
|
return false, nil
|
|
}
|
|
|
|
if needRes.MultiThread {
|
|
if w.cpuUse != 0 {
|
|
log.Debugf("sched: not scheduling on worker %d; multicore process needs free CPU", wid)
|
|
return false, nil
|
|
}
|
|
} else {
|
|
if w.cpuUse == -1 {
|
|
log.Debugf("sched: not scheduling on worker %d; CPU in use by a multicore process", wid)
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
if len(res.GPUs) > 0 && needRes.CanGPU {
|
|
if w.gpuUsed {
|
|
log.Debugf("sched: not scheduling on worker %d; GPU in use", wid)
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func (m *Manager) schedNewWorker(w *workerHandle) {
|
|
m.workersLk.Lock()
|
|
defer m.workersLk.Unlock()
|
|
|
|
id := m.nextWorker
|
|
m.workers[id] = w
|
|
m.nextWorker++
|
|
}
|