sched: use more letters for variables

This commit is contained in:
Łukasz Magiera 2020-10-28 14:23:38 +01:00
parent 8731fe9112
commit 96c5ff7e7f
8 changed files with 52 additions and 46 deletions

View File

@ -334,7 +334,7 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr stri
err: err, err: err,
} }
m.sched.wt.onDone(callID) m.sched.workTracker.onDone(callID)
m.workLk.Lock() m.workLk.Lock()
defer m.workLk.Unlock() defer m.workLk.Unlock()

View File

@ -64,7 +64,7 @@ type scheduler struct {
schedQueue *requestQueue schedQueue *requestQueue
openWindows []*schedWindowRequest openWindows []*schedWindowRequest
wt *workTracker workTracker *workTracker
info chan func(interface{}) info chan func(interface{})
@ -74,7 +74,7 @@ type scheduler struct {
} }
type workerHandle struct { type workerHandle struct {
w Worker workerRpc Worker
info storiface.WorkerInfo info storiface.WorkerInfo
@ -155,7 +155,7 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler {
schedQueue: &requestQueue{}, schedQueue: &requestQueue{},
wt: &workTracker{ workTracker: &workTracker{
done: map[storiface.CallID]struct{}{}, done: map[storiface.CallID]struct{}{},
running: map[storiface.CallID]trackedWork{}, running: map[storiface.CallID]trackedWork{},
}, },

View File

@ -530,7 +530,7 @@ func BenchmarkTrySched(b *testing.B) {
sched := newScheduler(spt) sched := newScheduler(spt)
sched.workers[WorkerID{}] = &workerHandle{ sched.workers[WorkerID{}] = &workerHandle{
w: nil, workerRpc: nil,
info: storiface.WorkerInfo{ info: storiface.WorkerInfo{
Hostname: "t", Hostname: "t",
Resources: decentWorkerResources, Resources: decentWorkerResources,

View File

@ -2,14 +2,15 @@ package sectorstorage
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"time" "time"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
) )
type schedWorker struct { type schedWorker struct {
sh *scheduler sched *scheduler
worker *workerHandle worker *workerHandle
wid WorkerID wid WorkerID
@ -37,8 +38,8 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
} }
worker := &workerHandle{ worker := &workerHandle{
w: w, workerRpc: w,
info: info, info: info,
preparing: &activeResources{}, preparing: &activeResources{},
active: &activeResources{}, active: &activeResources{},
@ -63,7 +64,7 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
sh.workersLk.Unlock() sh.workersLk.Unlock()
sw := &schedWorker{ sw := &schedWorker{
sh: sh, sched: sh,
worker: worker, worker: worker,
wid: wid, wid: wid,
@ -81,7 +82,7 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
} }
func (sw *schedWorker) handleWorker() { func (sw *schedWorker) handleWorker() {
worker, sh := sw.worker, sw.sh worker, sched := sw.worker, sw.sched
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
defer cancel() defer cancel()
@ -95,17 +96,17 @@ func (sw *schedWorker) handleWorker() {
log.Warnw("failed to disable worker", "worker", sw.wid, "error", err) log.Warnw("failed to disable worker", "worker", sw.wid, "error", err)
} }
sh.workersLk.Lock() sched.workersLk.Lock()
delete(sh.workers, sw.wid) delete(sched.workers, sw.wid)
sh.workersLk.Unlock() sched.workersLk.Unlock()
}() }()
defer sw.heartbeatTimer.Stop() defer sw.heartbeatTimer.Stop()
for { for {
sh.workersLk.Lock() sched.workersLk.Lock()
enabled := worker.enabled enabled := worker.enabled
sh.workersLk.Unlock() sched.workersLk.Unlock()
// ask for more windows if we need them (non-blocking) // ask for more windows if we need them (non-blocking)
if enabled { if enabled {
@ -123,9 +124,9 @@ func (sw *schedWorker) handleWorker() {
// session looks good // session looks good
if !enabled { if !enabled {
sh.workersLk.Lock() sched.workersLk.Lock()
worker.enabled = true worker.enabled = true
sh.workersLk.Unlock() sched.workersLk.Unlock()
// we'll send window requests on the next loop // we'll send window requests on the next loop
} }
@ -142,16 +143,16 @@ func (sw *schedWorker) handleWorker() {
} }
// process assigned windows (non-blocking) // process assigned windows (non-blocking)
sh.workersLk.RLock() sched.workersLk.RLock()
worker.wndLk.Lock() worker.wndLk.Lock()
sw.windowsRequested -= sh.workerCompactWindows(worker, sw.wid) sw.workerCompactWindows()
// send tasks to the worker // send tasks to the worker
sw.processAssignedWindows() sw.processAssignedWindows()
worker.wndLk.Unlock() worker.wndLk.Unlock()
sh.workersLk.RUnlock() sched.workersLk.RUnlock()
} }
} }
@ -160,7 +161,7 @@ func (sw *schedWorker) disable(ctx context.Context) error {
// request cleanup in the main scheduler goroutine // request cleanup in the main scheduler goroutine
select { select {
case sw.sh.workerDisable <- workerDisableReq{ case sw.sched.workerDisable <- workerDisableReq{
activeWindows: sw.worker.activeWindows, activeWindows: sw.worker.activeWindows,
wid: sw.wid, wid: sw.wid,
done: func() { done: func() {
@ -169,7 +170,7 @@ func (sw *schedWorker) disable(ctx context.Context) error {
}: }:
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-sw.sh.closing: case <-sw.sched.closing:
return nil return nil
} }
@ -178,7 +179,7 @@ func (sw *schedWorker) disable(ctx context.Context) error {
case <-done: case <-done:
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-sw.sh.closing: case <-sw.sched.closing:
return nil return nil
} }
@ -190,7 +191,7 @@ func (sw *schedWorker) disable(ctx context.Context) error {
func (sw *schedWorker) checkSession(ctx context.Context) bool { func (sw *schedWorker) checkSession(ctx context.Context) bool {
for { for {
sctx, scancel := context.WithTimeout(ctx, stores.HeartbeatInterval/2) sctx, scancel := context.WithTimeout(ctx, stores.HeartbeatInterval/2)
curSes, err := sw.worker.w.Session(sctx) curSes, err := sw.worker.workerRpc.Session(sctx)
scancel() scancel()
if err != nil { if err != nil {
// Likely temporary error // Likely temporary error
@ -213,7 +214,7 @@ func (sw *schedWorker) checkSession(ctx context.Context) bool {
if err := sw.disable(ctx); err != nil { if err := sw.disable(ctx); err != nil {
log.Warnw("failed to disable worker with session error", "worker", sw.wid, "error", err) log.Warnw("failed to disable worker with session error", "worker", sw.wid, "error", err)
} }
case <-sw.sh.closing: case <-sw.sched.closing:
return false return false
case <-sw.worker.closingMgr: case <-sw.worker.closingMgr:
return false return false
@ -237,11 +238,11 @@ func (sw *schedWorker) checkSession(ctx context.Context) bool {
func (sw *schedWorker) requestWindows() bool { func (sw *schedWorker) requestWindows() bool {
for ; sw.windowsRequested < SchedWindows; sw.windowsRequested++ { for ; sw.windowsRequested < SchedWindows; sw.windowsRequested++ {
select { select {
case sw.sh.windowRequests <- &schedWindowRequest{ case sw.sched.windowRequests <- &schedWindowRequest{
worker: sw.wid, worker: sw.wid,
done: sw.scheduledWindows, done: sw.scheduledWindows,
}: }:
case <-sw.sh.closing: case <-sw.sched.closing:
return false return false
case <-sw.worker.closingMgr: case <-sw.worker.closingMgr:
return false return false
@ -262,13 +263,16 @@ func (sw *schedWorker) waitForUpdates() (update bool, ok bool) {
case <-sw.taskDone: case <-sw.taskDone:
log.Debugw("task done", "workerid", sw.wid) log.Debugw("task done", "workerid", sw.wid)
return true, true return true, true
case <-sw.sh.closing: case <-sw.sched.closing:
case <-sw.worker.closingMgr: case <-sw.worker.closingMgr:
} }
return false, false return false, false
} }
func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) int { func (sw *schedWorker) workerCompactWindows() {
worker := sw.worker
// move tasks from older windows to newer windows if older windows // move tasks from older windows to newer windows if older windows
// still can fit them // still can fit them
if len(worker.activeWindows) > 1 { if len(worker.activeWindows) > 1 {
@ -277,8 +281,8 @@ func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) in
var moved []int var moved []int
for ti, todo := range window.todo { for ti, todo := range window.todo {
needRes := ResourceTable[todo.taskType][sh.spt] needRes := ResourceTable[todo.taskType][sw.sched.spt]
if !lower.allocated.canHandleRequest(needRes, wid, "compactWindows", worker.info.Resources) { if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info.Resources) {
continue continue
} }
@ -316,8 +320,7 @@ func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) in
} }
worker.activeWindows = newWindows worker.activeWindows = newWindows
sw.windowsRequested -= compacted
return compacted
} }
func (sw *schedWorker) processAssignedWindows() { func (sw *schedWorker) processAssignedWindows() {
@ -334,7 +337,7 @@ assignLoop:
worker.lk.Lock() worker.lk.Lock()
for t, todo := range firstWindow.todo { for t, todo := range firstWindow.todo {
needRes := ResourceTable[todo.taskType][sw.sh.spt] needRes := ResourceTable[todo.taskType][sw.sched.spt]
if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info.Resources) { if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info.Resources) {
tidx = t tidx = t
break break
@ -371,7 +374,7 @@ assignLoop:
} }
func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRequest) error { func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRequest) error {
w, sh := sw.worker, sw.sh w, sh := sw.worker, sw.sched
needRes := ResourceTable[req.taskType][sh.spt] needRes := ResourceTable[req.taskType][sh.spt]
@ -380,7 +383,8 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
w.lk.Unlock() w.lk.Unlock()
go func() { go func() {
err := req.prepare(req.ctx, sh.wt.worker(sw.wid, w.w)) // first run the prepare step (e.g. fetching sector data from other worker)
err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.workerRpc))
sh.workersLk.Lock() sh.workersLk.Lock()
if err != nil { if err != nil {
@ -405,6 +409,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
return return
} }
// wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.info.Resources, needRes, &sh.workersLk, func() error { err = w.active.withResources(sw.wid, w.info.Resources, needRes, &sh.workersLk, func() error {
w.lk.Lock() w.lk.Lock()
w.preparing.free(w.info.Resources, needRes) w.preparing.free(w.info.Resources, needRes)
@ -417,7 +422,8 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
case <-sh.closing: case <-sh.closing:
} }
err = req.work(req.ctx, sh.wt.worker(sw.wid, w.w)) // Do the work!
err = req.work(req.ctx, sh.workTracker.worker(sw.wid, w.workerRpc))
select { select {
case req.ret <- workerResponse{err: err}: case req.ret <- workerResponse{err: err}:

View File

@ -27,7 +27,7 @@ func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType,
} }
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {
tasks, err := whnd.w.TaskTypes(ctx) tasks, err := whnd.workerRpc.TaskTypes(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err) return false, xerrors.Errorf("getting supported worker task types: %w", err)
} }
@ -35,7 +35,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
return false, nil return false, nil
} }
paths, err := whnd.w.Paths(ctx) paths, err := whnd.workerRpc.Paths(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting worker paths: %w", err) return false, xerrors.Errorf("getting worker paths: %w", err)
} }

View File

@ -29,7 +29,7 @@ func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc st
} }
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {
tasks, err := whnd.w.TaskTypes(ctx) tasks, err := whnd.workerRpc.TaskTypes(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err) return false, xerrors.Errorf("getting supported worker task types: %w", err)
} }
@ -37,7 +37,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
return false, nil return false, nil
} }
paths, err := whnd.w.Paths(ctx) paths, err := whnd.workerRpc.Paths(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting worker paths: %w", err) return false, xerrors.Errorf("getting worker paths: %w", err)
} }

View File

@ -20,7 +20,7 @@ func newTaskSelector() *taskSelector {
} }
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {
tasks, err := whnd.w.TaskTypes(ctx) tasks, err := whnd.workerRpc.TaskTypes(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err) return false, xerrors.Errorf("getting supported worker task types: %w", err)
} }
@ -30,11 +30,11 @@ func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
} }
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *workerHandle) (bool, error) { func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *workerHandle) (bool, error) {
atasks, err := a.w.TaskTypes(ctx) atasks, err := a.workerRpc.TaskTypes(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err) return false, xerrors.Errorf("getting supported worker task types: %w", err)
} }
btasks, err := b.w.TaskTypes(ctx) btasks, err := b.workerRpc.TaskTypes(ctx)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err) return false, xerrors.Errorf("getting supported worker task types: %w", err)
} }

View File

@ -33,7 +33,7 @@ func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob {
out := map[uuid.UUID][]storiface.WorkerJob{} out := map[uuid.UUID][]storiface.WorkerJob{}
calls := map[storiface.CallID]struct{}{} calls := map[storiface.CallID]struct{}{}
for _, t := range m.sched.wt.Running() { for _, t := range m.sched.workTracker.Running() {
out[uuid.UUID(t.worker)] = append(out[uuid.UUID(t.worker)], t.job) out[uuid.UUID(t.worker)] = append(out[uuid.UUID(t.worker)], t.job)
calls[t.job.ID] = struct{}{} calls[t.job.ID] = struct{}{}
} }