lotus/extern/sector-storage/sched_post.go

233 lines
4.9 KiB
Go
Raw Normal View History

package sectorstorage
import (
"context"
"math/rand"
"sync"
"time"
2022-01-18 10:37:15 +00:00
xerrors "golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
sealtasks "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type poStScheduler struct {
lk sync.RWMutex
workers map[storiface.WorkerID]*WorkerHandle
cond *sync.Cond
postType sealtasks.TaskType
}
func newPoStScheduler(t sealtasks.TaskType) *poStScheduler {
ps := &poStScheduler{
workers: map[storiface.WorkerID]*WorkerHandle{},
postType: t,
}
ps.cond = sync.NewCond(&ps.lk)
return ps
}
func (ps *poStScheduler) MaybeAddWorker(wid storiface.WorkerID, tasks map[sealtasks.TaskType]struct{}, w *WorkerHandle) bool {
2022-01-14 13:11:04 +00:00
if _, ok := tasks[ps.postType]; !ok {
return false
}
ps.lk.Lock()
defer ps.lk.Unlock()
ps.workers[wid] = w
go ps.watch(wid, w)
ps.cond.Broadcast()
return true
}
func (ps *poStScheduler) delWorker(wid storiface.WorkerID) *WorkerHandle {
ps.lk.Lock()
defer ps.lk.Unlock()
var w *WorkerHandle = nil
if wh, ok := ps.workers[wid]; ok {
w = wh
delete(ps.workers, wid)
}
return w
}
func (ps *poStScheduler) CanSched(ctx context.Context) bool {
ps.lk.RLock()
defer ps.lk.RUnlock()
if len(ps.workers) == 0 {
return false
}
for _, w := range ps.workers {
if w.Enabled {
return true
}
}
return false
}
func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.RegisteredSealProof, work WorkerAction) error {
ps.lk.Lock()
defer ps.lk.Unlock()
if len(ps.workers) == 0 {
return xerrors.Errorf("can't find %s post worker", ps.postType)
}
// Get workers by resource
canDo, candidates := ps.readyWorkers(spt)
for !canDo {
//if primary is true, it must be dispatched to a worker
if primary {
ps.cond.Wait()
canDo, candidates = ps.readyWorkers(spt)
} else {
return xerrors.Errorf("can't find %s post worker", ps.postType)
}
}
defer func() {
if ps.cond != nil {
ps.cond.Broadcast()
}
}()
selected := candidates[0]
worker := ps.workers[selected.id]
return worker.active.withResources(selected.id, worker.Info, selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()
return work(ctx, worker.workerRpc)
})
}
type candidateWorker struct {
id storiface.WorkerID
res storiface.Resources
}
func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []candidateWorker) {
var accepts []candidateWorker
//if the gpus of the worker are insufficient or it's disabled, it cannot be scheduled
for wid, wr := range ps.workers {
needRes := wr.Info.Resources.ResourceSpec(spt, ps.postType)
if !wr.active.CanHandleRequest(needRes, wid, "post-readyWorkers", wr.Info) {
continue
}
accepts = append(accepts, candidateWorker{
id: wid,
res: needRes,
})
}
// todo: round robin or something
rand.Shuffle(len(accepts), func(i, j int) {
accepts[i], accepts[j] = accepts[j], accepts[i]
})
return len(accepts) != 0, accepts
}
func (ps *poStScheduler) disable(wid storiface.WorkerID) {
ps.lk.Lock()
defer ps.lk.Unlock()
ps.workers[wid].Enabled = false
}
func (ps *poStScheduler) enable(wid storiface.WorkerID) {
ps.lk.Lock()
defer ps.lk.Unlock()
ps.workers[wid].Enabled = true
}
func (ps *poStScheduler) watch(wid storiface.WorkerID, worker *WorkerHandle) {
heartbeatTimer := time.NewTicker(stores.HeartbeatInterval)
defer heartbeatTimer.Stop()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
defer close(worker.closedMgr)
defer func() {
log.Warnw("Worker closing", "WorkerID", wid)
ps.delWorker(wid)
}()
for {
sctx, scancel := context.WithTimeout(ctx, stores.HeartbeatInterval/2)
curSes, err := worker.workerRpc.Session(sctx)
scancel()
if err != nil {
// Likely temporary error
log.Warnw("failed to check worker session", "error", err)
ps.disable(wid)
select {
case <-heartbeatTimer.C:
continue
case <-worker.closingMgr:
return
}
}
if storiface.WorkerID(curSes) != wid {
if curSes != ClosedWorkerID {
// worker restarted
log.Warnw("worker session changed (worker restarted?)", "initial", wid, "current", curSes)
}
return
}
ps.enable(wid)
}
}
func (ps *poStScheduler) workerCleanup(wid storiface.WorkerID, w *WorkerHandle) {
select {
case <-w.closingMgr:
default:
close(w.closingMgr)
}
ps.lk.Unlock()
select {
case <-w.closedMgr:
case <-time.After(time.Second):
2022-01-14 13:11:04 +00:00
log.Errorf("timeout closing worker manager goroutine %s", wid)
}
ps.lk.Lock()
}
func (ps *poStScheduler) schedClose() {
ps.lk.Lock()
defer ps.lk.Unlock()
log.Debugf("closing scheduler")
for i, w := range ps.workers {
ps.workerCleanup(i, w)
}
}
func (ps *poStScheduler) WorkerStats(ctx context.Context, cb func(ctx context.Context, wid storiface.WorkerID, worker *WorkerHandle)) {
ps.lk.RLock()
defer ps.lk.RUnlock()
for id, w := range ps.workers {
cb(ctx, id, w)
}
}