sched: split worker handling into more funcs

This commit is contained in:
Łukasz Magiera 2020-10-28 14:14:38 +01:00
parent 84b567c790
commit 8731fe9112
2 changed files with 269 additions and 208 deletions

View File

@ -2,13 +2,25 @@ package sectorstorage
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"time" "time"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
) )
type schedWorker struct {
sh *scheduler
worker *workerHandle
wid WorkerID
heartbeatTimer *time.Ticker
scheduledWindows chan *schedWindow
taskDone chan struct{}
windowsRequested int
}
// context only used for startup // context only used for startup
func (sh *scheduler) runWorker(ctx context.Context, w Worker) error { func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
info, err := w.Info(ctx) info, err := w.Info(ctx)
@ -50,62 +62,45 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
sh.workers[wid] = worker sh.workers[wid] = worker
sh.workersLk.Unlock() sh.workersLk.Unlock()
go func() { sw := &schedWorker{
sh: sh,
worker: worker,
wid: wid,
heartbeatTimer: time.NewTicker(stores.HeartbeatInterval),
scheduledWindows: make(chan *schedWindow, SchedWindows),
taskDone: make(chan struct{}, 1),
windowsRequested: 0,
}
go sw.handleWorker()
return nil
}
func (sw *schedWorker) handleWorker() {
worker, sh := sw.worker, sw.sh
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
defer cancel() defer cancel()
defer close(worker.closedMgr) defer close(worker.closedMgr)
scheduledWindows := make(chan *schedWindow, SchedWindows)
taskDone := make(chan struct{}, 1)
windowsRequested := 0
disable := func(ctx context.Context) error {
done := make(chan struct{})
// request cleanup in the main scheduler goroutine
select {
case sh.workerDisable <- workerDisableReq{
activeWindows: worker.activeWindows,
wid: wid,
done: func() {
close(done)
},
}:
case <-ctx.Done():
return ctx.Err()
case <-sh.closing:
return nil
}
// wait for cleanup to complete
select {
case <-done:
case <-ctx.Done():
return ctx.Err()
case <-sh.closing:
return nil
}
worker.activeWindows = worker.activeWindows[:0]
windowsRequested = 0
return nil
}
defer func() { defer func() {
log.Warnw("Worker closing", "workerid", sessID) log.Warnw("Worker closing", "workerid", sw.wid)
if err := disable(ctx); err != nil { if err := sw.disable(ctx); err != nil {
log.Warnw("failed to disable worker", "worker", wid, "error", err) log.Warnw("failed to disable worker", "worker", sw.wid, "error", err)
} }
sh.workersLk.Lock() sh.workersLk.Lock()
delete(sh.workers, wid) delete(sh.workers, sw.wid)
sh.workersLk.Unlock() sh.workersLk.Unlock()
}() }()
heartbeatTimer := time.NewTicker(stores.HeartbeatInterval) defer sw.heartbeatTimer.Stop()
defer heartbeatTimer.Stop()
for { for {
sh.workersLk.Lock() sh.workersLk.Lock()
@ -113,63 +108,17 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
sh.workersLk.Unlock() sh.workersLk.Unlock()
// ask for more windows if we need them (non-blocking) // ask for more windows if we need them (non-blocking)
for ; enabled && windowsRequested < SchedWindows; windowsRequested++ { if enabled {
select { if !sw.requestWindows() {
case sh.windowRequests <- &schedWindowRequest{ return // graceful shutdown
worker: wid,
done: scheduledWindows,
}:
case <-sh.closing:
return
case <-worker.closingMgr:
return
} }
} }
// wait for more windows to come in, or for tasks to get finished (blocking) // wait for more windows to come in, or for tasks to get finished (blocking)
for { for {
// ping the worker and check session
// first ping the worker and check session if !sw.checkSession(ctx) {
{ return // invalid session / exiting
sctx, scancel := context.WithTimeout(ctx, stores.HeartbeatInterval/2)
curSes, err := worker.w.Session(sctx)
scancel()
if err != nil {
// Likely temporary error
log.Warnw("failed to check worker session", "error", err)
if err := disable(ctx); err != nil {
log.Warnw("failed to disable worker with session error", "worker", wid, "error", err)
}
select {
case <-heartbeatTimer.C:
continue
case w := <-scheduledWindows:
// was in flight when initially disabled, return
worker.wndLk.Lock()
worker.activeWindows = append(worker.activeWindows, w)
worker.wndLk.Unlock()
if err := disable(ctx); err != nil {
log.Warnw("failed to disable worker with session error", "worker", wid, "error", err)
}
case <-sh.closing:
return
case <-worker.closingMgr:
return
}
continue
}
if curSes != sessID {
if curSes != ClosedWorkerID {
// worker restarted
log.Warnw("worker session changed (worker restarted?)", "initial", sessID, "current", curSes)
}
return
} }
// session looks good // session looks good
@ -180,85 +129,145 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
// we'll send window requests on the next loop // we'll send window requests on the next loop
} }
}
select { // wait for more tasks to be assigned by the main scheduler or for the worker
case <-heartbeatTimer.C: // to finish precessing a task
continue update, ok := sw.waitForUpdates()
case w := <-scheduledWindows: if !ok {
worker.wndLk.Lock()
worker.activeWindows = append(worker.activeWindows, w)
worker.wndLk.Unlock()
case <-taskDone:
log.Debugw("task done", "workerid", wid)
case <-sh.closing:
return
case <-worker.closingMgr:
return return
} }
if update {
break break
} }
}
// process assigned windows (non-blocking) // process assigned windows (non-blocking)
sh.workersLk.RLock() sh.workersLk.RLock()
worker.wndLk.Lock() worker.wndLk.Lock()
windowsRequested -= sh.workerCompactWindows(worker, wid) sw.windowsRequested -= sh.workerCompactWindows(worker, sw.wid)
assignLoop:
// process windows in order
for len(worker.activeWindows) > 0 {
firstWindow := worker.activeWindows[0]
// process tasks within a window, preferring tasks at lower indexes // send tasks to the worker
for len(firstWindow.todo) > 0 { sw.processAssignedWindows()
tidx := -1
worker.lk.Lock()
for t, todo := range firstWindow.todo {
needRes := ResourceTable[todo.taskType][sh.spt]
if worker.preparing.canHandleRequest(needRes, wid, "startPreparing", worker.info.Resources) {
tidx = t
break
}
}
worker.lk.Unlock()
if tidx == -1 {
break assignLoop
}
todo := firstWindow.todo[tidx]
log.Debugf("assign worker sector %d", todo.sector.Number)
err := sh.assignWorker(taskDone, wid, worker, todo)
if err != nil {
log.Error("assignWorker error: %+v", err)
go todo.respond(xerrors.Errorf("assignWorker error: %w", err))
}
// Note: we're not freeing window.allocated resources here very much on purpose
copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:])
firstWindow.todo[len(firstWindow.todo)-1] = nil
firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1]
}
copy(worker.activeWindows, worker.activeWindows[1:])
worker.activeWindows[len(worker.activeWindows)-1] = nil
worker.activeWindows = worker.activeWindows[:len(worker.activeWindows)-1]
windowsRequested--
}
worker.wndLk.Unlock() worker.wndLk.Unlock()
sh.workersLk.RUnlock() sh.workersLk.RUnlock()
} }
}() }
func (sw *schedWorker) disable(ctx context.Context) error {
done := make(chan struct{})
// request cleanup in the main scheduler goroutine
select {
case sw.sh.workerDisable <- workerDisableReq{
activeWindows: sw.worker.activeWindows,
wid: sw.wid,
done: func() {
close(done)
},
}:
case <-ctx.Done():
return ctx.Err()
case <-sw.sh.closing:
return nil return nil
} }
// wait for cleanup to complete
select {
case <-done:
case <-ctx.Done():
return ctx.Err()
case <-sw.sh.closing:
return nil
}
sw.worker.activeWindows = sw.worker.activeWindows[:0]
sw.windowsRequested = 0
return nil
}
func (sw *schedWorker) checkSession(ctx context.Context) bool {
for {
sctx, scancel := context.WithTimeout(ctx, stores.HeartbeatInterval/2)
curSes, err := sw.worker.w.Session(sctx)
scancel()
if err != nil {
// Likely temporary error
log.Warnw("failed to check worker session", "error", err)
if err := sw.disable(ctx); err != nil {
log.Warnw("failed to disable worker with session error", "worker", sw.wid, "error", err)
}
select {
case <-sw.heartbeatTimer.C:
continue
case w := <-sw.scheduledWindows:
// was in flight when initially disabled, return
sw.worker.wndLk.Lock()
sw.worker.activeWindows = append(sw.worker.activeWindows, w)
sw.worker.wndLk.Unlock()
if err := sw.disable(ctx); err != nil {
log.Warnw("failed to disable worker with session error", "worker", sw.wid, "error", err)
}
case <-sw.sh.closing:
return false
case <-sw.worker.closingMgr:
return false
}
continue
}
if WorkerID(curSes) != sw.wid {
if curSes != ClosedWorkerID {
// worker restarted
log.Warnw("worker session changed (worker restarted?)", "initial", sw.wid, "current", curSes)
}
return false
}
return true
}
}
func (sw *schedWorker) requestWindows() bool {
for ; sw.windowsRequested < SchedWindows; sw.windowsRequested++ {
select {
case sw.sh.windowRequests <- &schedWindowRequest{
worker: sw.wid,
done: sw.scheduledWindows,
}:
case <-sw.sh.closing:
return false
case <-sw.worker.closingMgr:
return false
}
}
return true
}
func (sw *schedWorker) waitForUpdates() (update bool, ok bool) {
select {
case <-sw.heartbeatTimer.C:
return false, true
case w := <-sw.scheduledWindows:
sw.worker.wndLk.Lock()
sw.worker.activeWindows = append(sw.worker.activeWindows, w)
sw.worker.wndLk.Unlock()
return true, true
case <-sw.taskDone:
log.Debugw("task done", "workerid", sw.wid)
return true, true
case <-sw.sh.closing:
case <-sw.worker.closingMgr:
}
return false, false
}
func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) int { func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) int {
// move tasks from older windows to newer windows if older windows // move tasks from older windows to newer windows if older windows
// still can fit them // still can fit them
@ -311,7 +320,59 @@ func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) in
return compacted return compacted
} }
func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error { func (sw *schedWorker) processAssignedWindows() {
worker := sw.worker
assignLoop:
// process windows in order
for len(worker.activeWindows) > 0 {
firstWindow := worker.activeWindows[0]
// process tasks within a window, preferring tasks at lower indexes
for len(firstWindow.todo) > 0 {
tidx := -1
worker.lk.Lock()
for t, todo := range firstWindow.todo {
needRes := ResourceTable[todo.taskType][sw.sh.spt]
if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info.Resources) {
tidx = t
break
}
}
worker.lk.Unlock()
if tidx == -1 {
break assignLoop
}
todo := firstWindow.todo[tidx]
log.Debugf("assign worker sector %d", todo.sector.Number)
err := sw.startProcessingTask(sw.taskDone, todo)
if err != nil {
log.Error("startProcessingTask error: %+v", err)
go todo.respond(xerrors.Errorf("startProcessingTask error: %w", err))
}
// Note: we're not freeing window.allocated resources here very much on purpose
copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:])
firstWindow.todo[len(firstWindow.todo)-1] = nil
firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1]
}
copy(worker.activeWindows, worker.activeWindows[1:])
worker.activeWindows[len(worker.activeWindows)-1] = nil
worker.activeWindows = worker.activeWindows[:len(worker.activeWindows)-1]
sw.windowsRequested--
}
}
func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRequest) error {
w, sh := sw.worker, sw.sh
needRes := ResourceTable[req.taskType][sh.spt] needRes := ResourceTable[req.taskType][sh.spt]
w.lk.Lock() w.lk.Lock()
@ -319,7 +380,7 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
w.lk.Unlock() w.lk.Unlock()
go func() { go func() {
err := req.prepare(req.ctx, sh.wt.worker(wid, w.w)) err := req.prepare(req.ctx, sh.wt.worker(sw.wid, w.w))
sh.workersLk.Lock() sh.workersLk.Lock()
if err != nil { if err != nil {
@ -344,7 +405,7 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
return return
} }
err = w.active.withResources(wid, w.info.Resources, needRes, &sh.workersLk, func() error { err = w.active.withResources(sw.wid, w.info.Resources, needRes, &sh.workersLk, func() error {
w.lk.Lock() w.lk.Lock()
w.preparing.free(w.info.Resources, needRes) w.preparing.free(w.info.Resources, needRes)
w.lk.Unlock() w.lk.Unlock()
@ -356,7 +417,7 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
case <-sh.closing: case <-sh.closing:
} }
err = req.work(req.ctx, sh.wt.worker(wid, w.w)) err = req.work(req.ctx, sh.wt.worker(sw.wid, w.w))
select { select {
case req.ret <- workerResponse{err: err}: case req.ret <- workerResponse{err: err}:

View File

@ -225,7 +225,7 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt Ret
res, err := work(ctx, ci) res, err := work(ctx, ci)
{ if err != nil {
rb, err := json.Marshal(res) rb, err := json.Marshal(res)
if err != nil { if err != nil {
log.Errorf("tracking call (marshaling results): %+v", err) log.Errorf("tracking call (marshaling results): %+v", err)