lotus/extern/sector-storage/sched_worker.go

485 lines
11 KiB
Go

package sectorstorage
import (
"context"
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
)
type schedWorker struct {
sched *scheduler
worker *workerHandle
wid WorkerID
heartbeatTimer *time.Ticker
scheduledWindows chan *schedWindow
taskDone chan struct{}
windowsRequested int
}
// context only used for startup
func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
info, err := w.Info(ctx)
if err != nil {
return xerrors.Errorf("getting worker info: %w", err)
}
sessID, err := w.Session(ctx)
if err != nil {
return xerrors.Errorf("getting worker session: %w", err)
}
if sessID == ClosedWorkerID {
return xerrors.Errorf("worker already closed")
}
worker := &workerHandle{
workerRpc: w,
info: info,
preparing: &activeResources{},
active: &activeResources{},
enabled: true,
closingMgr: make(chan struct{}),
closedMgr: make(chan struct{}),
}
wid := WorkerID(sessID)
sh.workersLk.Lock()
_, exist := sh.workers[wid]
if exist {
log.Warnw("duplicated worker added", "id", wid)
// this is ok, we're already handling this worker in a different goroutine
return nil
}
sh.workers[wid] = worker
sh.workersLk.Unlock()
sw := &schedWorker{
sched: sh,
worker: worker,
wid: wid,
heartbeatTimer: time.NewTicker(stores.HeartbeatInterval),
scheduledWindows: make(chan *schedWindow, SchedWindows),
taskDone: make(chan struct{}, 1),
windowsRequested: 0,
}
go sw.handleWorker()
return nil
}
func (sw *schedWorker) handleWorker() {
worker, sched := sw.worker, sw.sched
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
defer close(worker.closedMgr)
defer func() {
log.Warnw("Worker closing", "workerid", sw.wid)
if err := sw.disable(ctx); err != nil {
log.Warnw("failed to disable worker", "worker", sw.wid, "error", err)
}
sched.workersLk.Lock()
delete(sched.workers, sw.wid)
sched.workersLk.Unlock()
}()
defer sw.heartbeatTimer.Stop()
for {
{
sched.workersLk.Lock()
enabled := worker.enabled
sched.workersLk.Unlock()
// ask for more windows if we need them (non-blocking)
if enabled {
if !sw.requestWindows() {
return // graceful shutdown
}
}
}
// wait for more windows to come in, or for tasks to get finished (blocking)
for {
// ping the worker and check session
if !sw.checkSession(ctx) {
return // invalid session / exiting
}
// session looks good
{
sched.workersLk.Lock()
enabled := worker.enabled
worker.enabled = true
sched.workersLk.Unlock()
if !enabled {
// go send window requests
break
}
}
// wait for more tasks to be assigned by the main scheduler or for the worker
// to finish precessing a task
update, ok := sw.waitForUpdates()
if !ok {
return
}
if update {
break
}
}
// process assigned windows (non-blocking)
sched.workersLk.RLock()
worker.wndLk.Lock()
sw.workerCompactWindows()
// send tasks to the worker
sw.processAssignedWindows()
worker.wndLk.Unlock()
sched.workersLk.RUnlock()
}
}
func (sw *schedWorker) disable(ctx context.Context) error {
done := make(chan struct{})
// request cleanup in the main scheduler goroutine
select {
case sw.sched.workerDisable <- workerDisableReq{
activeWindows: sw.worker.activeWindows,
wid: sw.wid,
done: func() {
close(done)
},
}:
case <-ctx.Done():
return ctx.Err()
case <-sw.sched.closing:
return nil
}
// wait for cleanup to complete
select {
case <-done:
case <-ctx.Done():
return ctx.Err()
case <-sw.sched.closing:
return nil
}
sw.worker.activeWindows = sw.worker.activeWindows[:0]
sw.windowsRequested = 0
return nil
}
func (sw *schedWorker) checkSession(ctx context.Context) bool {
for {
sctx, scancel := context.WithTimeout(ctx, stores.HeartbeatInterval/2)
curSes, err := sw.worker.workerRpc.Session(sctx)
scancel()
if err != nil {
// Likely temporary error
log.Warnw("failed to check worker session", "error", err)
if err := sw.disable(ctx); err != nil {
log.Warnw("failed to disable worker with session error", "worker", sw.wid, "error", err)
}
select {
case <-sw.heartbeatTimer.C:
continue
case w := <-sw.scheduledWindows:
// was in flight when initially disabled, return
sw.worker.wndLk.Lock()
sw.worker.activeWindows = append(sw.worker.activeWindows, w)
sw.worker.wndLk.Unlock()
if err := sw.disable(ctx); err != nil {
log.Warnw("failed to disable worker with session error", "worker", sw.wid, "error", err)
}
case <-sw.sched.closing:
return false
case <-sw.worker.closingMgr:
return false
}
continue
}
if WorkerID(curSes) != sw.wid {
if curSes != ClosedWorkerID {
// worker restarted
log.Warnw("worker session changed (worker restarted?)", "initial", sw.wid, "current", curSes)
}
return false
}
return true
}
}
func (sw *schedWorker) requestWindows() bool {
for ; sw.windowsRequested < SchedWindows; sw.windowsRequested++ {
select {
case sw.sched.windowRequests <- &schedWindowRequest{
worker: sw.wid,
done: sw.scheduledWindows,
}:
case <-sw.sched.closing:
return false
case <-sw.worker.closingMgr:
return false
}
}
return true
}
func (sw *schedWorker) waitForUpdates() (update bool, ok bool) {
select {
case <-sw.heartbeatTimer.C:
return false, true
case w := <-sw.scheduledWindows:
sw.worker.wndLk.Lock()
sw.worker.activeWindows = append(sw.worker.activeWindows, w)
sw.worker.wndLk.Unlock()
return true, true
case <-sw.taskDone:
log.Debugw("task done", "workerid", sw.wid)
return true, true
case <-sw.sched.closing:
case <-sw.worker.closingMgr:
}
return false, false
}
func (sw *schedWorker) workerCompactWindows() {
worker := sw.worker
// move tasks from older windows to newer windows if older windows
// still can fit them
if len(worker.activeWindows) > 1 {
for wi, window := range worker.activeWindows[1:] {
lower := worker.activeWindows[wi]
var moved []int
for ti, todo := range window.todo {
needRes := ResourceTable[todo.taskType][sw.sched.spt]
if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info.Resources) {
continue
}
moved = append(moved, ti)
lower.todo = append(lower.todo, todo)
lower.allocated.add(worker.info.Resources, needRes)
window.allocated.free(worker.info.Resources, needRes)
}
if len(moved) > 0 {
newTodo := make([]*workerRequest, 0, len(window.todo)-len(moved))
for i, t := range window.todo {
if len(moved) > 0 && moved[0] == i {
moved = moved[1:]
continue
}
newTodo = append(newTodo, t)
}
window.todo = newTodo
}
}
}
var compacted int
var newWindows []*schedWindow
for _, window := range worker.activeWindows {
if len(window.todo) == 0 {
compacted++
continue
}
newWindows = append(newWindows, window)
}
worker.activeWindows = newWindows
sw.windowsRequested -= compacted
}
func (sw *schedWorker) processAssignedWindows() {
worker := sw.worker
assignLoop:
// process windows in order
for len(worker.activeWindows) > 0 {
firstWindow := worker.activeWindows[0]
// process tasks within a window, preferring tasks at lower indexes
for len(firstWindow.todo) > 0 {
tidx := -1
worker.lk.Lock()
for t, todo := range firstWindow.todo {
needRes := ResourceTable[todo.taskType][sw.sched.spt]
if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info.Resources) {
tidx = t
break
}
}
worker.lk.Unlock()
if tidx == -1 {
break assignLoop
}
todo := firstWindow.todo[tidx]
log.Debugf("assign worker sector %d", todo.sector.Number)
err := sw.startProcessingTask(sw.taskDone, todo)
if err != nil {
log.Error("startProcessingTask error: %+v", err)
go todo.respond(xerrors.Errorf("startProcessingTask error: %w", err))
}
// Note: we're not freeing window.allocated resources here very much on purpose
copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:])
firstWindow.todo[len(firstWindow.todo)-1] = nil
firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1]
}
copy(worker.activeWindows, worker.activeWindows[1:])
worker.activeWindows[len(worker.activeWindows)-1] = nil
worker.activeWindows = worker.activeWindows[:len(worker.activeWindows)-1]
sw.windowsRequested--
}
}
func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRequest) error {
w, sh := sw.worker, sw.sched
needRes := ResourceTable[req.taskType][sh.spt]
w.lk.Lock()
w.preparing.add(w.info.Resources, needRes)
w.lk.Unlock()
go func() {
// first run the prepare step (e.g. fetching sector data from other worker)
err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.workerRpc))
sh.workersLk.Lock()
if err != nil {
w.lk.Lock()
w.preparing.free(w.info.Resources, needRes)
w.lk.Unlock()
sh.workersLk.Unlock()
select {
case taskDone <- struct{}{}:
case <-sh.closing:
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
}
select {
case req.ret <- workerResponse{err: err}:
case <-req.ctx.Done():
log.Warnf("request got cancelled before we could respond (prepare error: %+v)", err)
case <-sh.closing:
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
}
return
}
// wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.info.Resources, needRes, &sh.workersLk, func() error {
w.lk.Lock()
w.preparing.free(w.info.Resources, needRes)
w.lk.Unlock()
sh.workersLk.Unlock()
defer sh.workersLk.Lock() // we MUST return locked from this function
select {
case taskDone <- struct{}{}:
case <-sh.closing:
}
// Do the work!
err = req.work(req.ctx, sh.workTracker.worker(sw.wid, w.workerRpc))
select {
case req.ret <- workerResponse{err: err}:
case <-req.ctx.Done():
log.Warnf("request got cancelled before we could respond")
case <-sh.closing:
log.Warnf("scheduler closed while sending response")
}
return nil
})
sh.workersLk.Unlock()
// This error should always be nil, since nothing is setting it, but just to be safe:
if err != nil {
log.Errorf("error executing worker (withResources): %+v", err)
}
}()
return nil
}
func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) {
select {
case <-w.closingMgr:
default:
close(w.closingMgr)
}
sh.workersLk.Unlock()
select {
case <-w.closedMgr:
case <-time.After(time.Second):
log.Errorf("timeout closing worker manager goroutine %d", wid)
}
sh.workersLk.Lock()
if !w.cleanupStarted {
w.cleanupStarted = true
newWindows := make([]*schedWindowRequest, 0, len(sh.openWindows))
for _, window := range sh.openWindows {
if window.worker != wid {
newWindows = append(newWindows, window)
}
}
sh.openWindows = newWindows
log.Debugf("worker %d dropped", wid)
}
}