diff --git a/cmd/lotus-miner/sealing.go b/cmd/lotus-miner/sealing.go index a7e0a8de8..472af8da6 100644 --- a/cmd/lotus-miner/sealing.go +++ b/cmd/lotus-miner/sealing.go @@ -224,8 +224,10 @@ var sealingJobsCmd = &cli.Command{ for _, l := range lines { state := "running" switch { - case l.RunWait > 0: + case l.RunWait > 1: state = fmt.Sprintf("assigned(%d)", l.RunWait-1) + case l.RunWait == storiface.RWPrepared: + state = "prepared" case l.RunWait == storiface.RWRetDone: if !cctx.Bool("show-ret-done") { continue diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 38e901bf2..1ffb15e5b 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -155,8 +155,9 @@ func newScheduler() *scheduler { schedQueue: &requestQueue{}, workTracker: &workTracker{ - done: map[storiface.CallID]struct{}{}, - running: map[storiface.CallID]trackedWork{}, + done: map[storiface.CallID]struct{}{}, + running: map[storiface.CallID]trackedWork{}, + prepared: map[uuid.UUID]trackedWork{}, }, info: make(chan func(interface{})), diff --git a/extern/sector-storage/sched_worker.go b/extern/sector-storage/sched_worker.go index 42bba2ee5..e717e58e2 100644 --- a/extern/sector-storage/sched_worker.go +++ b/extern/sector-storage/sched_worker.go @@ -464,7 +464,9 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { go func() { // first run the prepare step (e.g. fetching sector data from other worker) - err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc)) + tw := sh.workTracker.worker(sw.wid, w.info, w.workerRpc) + tw.start() + err := req.prepare(req.ctx, tw) w.lk.Lock() if err != nil { @@ -488,6 +490,14 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { return } + tw = sh.workTracker.worker(sw.wid, w.info, w.workerRpc) + + // start tracking work first early in case we need to wait for resources + werr := make(chan error, 1) + go func() { + werr <- req.work(req.ctx, tw) + }() + // wait (if needed) for resources in the 'active' window err = w.active.withResources(sw.wid, w.info, needRes, &w.lk, func() error { w.preparing.free(w.info.Resources, needRes) @@ -501,7 +511,8 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { } // Do the work! - err = req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc)) + tw.start() + err = <-werr select { case req.ret <- workerResponse{err: err}: @@ -534,7 +545,9 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error { go func() { // Do the work! - err := req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc)) + tw := sh.workTracker.worker(sw.wid, w.info, w.workerRpc) + tw.start() + err := req.work(req.ctx, tw) select { case req.ret <- workerResponse{err: err}: diff --git a/extern/sector-storage/stats.go b/extern/sector-storage/stats.go index c5bc2fba1..43828742a 100644 --- a/extern/sector-storage/stats.go +++ b/extern/sector-storage/stats.go @@ -35,7 +35,13 @@ func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob { out := map[uuid.UUID][]storiface.WorkerJob{} calls := map[storiface.CallID]struct{}{} - for _, t := range m.sched.workTracker.Running() { + running, preparing := m.sched.workTracker.Running() + + for _, t := range running { + out[uuid.UUID(t.worker)] = append(out[uuid.UUID(t.worker)], t.job) + calls[t.job.ID] = struct{}{} + } + for _, t := range preparing { out[uuid.UUID(t.worker)] = append(out[uuid.UUID(t.worker)], t.job) calls[t.job.ID] = struct{}{} } @@ -50,7 +56,7 @@ func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob { ID: storiface.UndefCall, Sector: request.sector.ID, Task: request.taskType, - RunWait: wi + 1, + RunWait: wi + 2, Start: request.start, }) } diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index d1373f4c5..e3374d6cf 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -47,6 +47,8 @@ type WorkerStats struct { } const ( + RWPrepared = 1 + RWRunning = 0 RWRetWait = -1 RWReturned = -2 RWRetDone = -3 @@ -57,7 +59,8 @@ type WorkerJob struct { Sector abi.SectorID Task sealtasks.TaskType - // 1+ - assigned + // 2+ - assigned + // 1 - prepared // 0 - running // -1 - ret-wait // -2 - returned diff --git a/extern/sector-storage/worker_tracked.go b/extern/sector-storage/worker_tracked.go index 2160dd8e6..5702426c3 100644 --- a/extern/sector-storage/worker_tracked.go +++ b/extern/sector-storage/worker_tracked.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/ipfs/go-cid" "go.opencensus.io/stats" "go.opencensus.io/tag" @@ -26,8 +27,9 @@ type trackedWork struct { type workTracker struct { lk sync.Mutex - done map[storiface.CallID]struct{} - running map[storiface.CallID]trackedWork + done map[storiface.CallID]struct{} + running map[storiface.CallID]trackedWork + prepared map[uuid.UUID]trackedWork // TODO: done, aggregate stats, queue stats, scheduler feedback } @@ -56,63 +58,96 @@ func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) { delete(wt.running, callID) } -func (wt *workTracker) track(ctx context.Context, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) { - return func(callID storiface.CallID, err error) (storiface.CallID, error) { - if err != nil { - return callID, err - } - - wt.lk.Lock() - defer wt.lk.Unlock() - - _, done := wt.done[callID] - if done { - delete(wt.done, callID) - return callID, err - } - - wt.running[callID] = trackedWork{ +func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType, cb func() (storiface.CallID, error)) (storiface.CallID, error) { + tracked := func(rw int, callID storiface.CallID) trackedWork { + return trackedWork{ job: storiface.WorkerJob{ - ID: callID, - Sector: sid.ID, - Task: task, - Start: time.Now(), + ID: callID, + Sector: sid.ID, + Task: task, + Start: time.Now(), + RunWait: rw, }, worker: wid, workerHostname: wi.Hostname, } + } - ctx, _ = tag.New( - ctx, - tag.Upsert(metrics.TaskType, string(task)), - tag.Upsert(metrics.WorkerHostname, wi.Hostname), - ) - stats.Record(ctx, metrics.WorkerCallsStarted.M(1)) + wt.lk.Lock() + defer wt.lk.Unlock() + select { + case <-ready: + case <-ctx.Done(): + return storiface.UndefCall, ctx.Err() + default: + prepID := uuid.New() + + wt.prepared[prepID] = tracked(storiface.RWPrepared, storiface.UndefCall) + + wt.lk.Unlock() + + select { + case <-ready: + case <-ctx.Done(): + wt.lk.Lock() + delete(wt.prepared, prepID) + return storiface.UndefCall, ctx.Err() + } + + wt.lk.Lock() + delete(wt.prepared, prepID) + } + + callID, err := cb() + if err != nil { return callID, err } + + _, done := wt.done[callID] + if done { + delete(wt.done, callID) + return callID, err + } + + wt.running[callID] = tracked(storiface.RWRunning, callID) + + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.TaskType, string(task)), + tag.Upsert(metrics.WorkerHostname, wi.Hostname), + ) + stats.Record(ctx, metrics.WorkerCallsStarted.M(1)) + + return callID, err } -func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) Worker { +func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) *trackedWorker { return &trackedWorker{ Worker: w, wid: wid, workerInfo: wi, + execute: make(chan struct{}), + tracker: wt, } } -func (wt *workTracker) Running() []trackedWork { +func (wt *workTracker) Running() ([]trackedWork, []trackedWork) { wt.lk.Lock() defer wt.lk.Unlock() - out := make([]trackedWork, 0, len(wt.running)) + running := make([]trackedWork, 0, len(wt.running)) for _, job := range wt.running { - out = append(out, job) + running = append(running, job) + } + prepared := make([]trackedWork, 0, len(wt.prepared)) + for _, job := range wt.prepared { + prepared = append(prepared, job) } - return out + return running, prepared } type trackedWorker struct { @@ -120,39 +155,47 @@ type trackedWorker struct { wid WorkerID workerInfo storiface.WorkerInfo + execute chan struct{} // channel blocking execution in case we're waiting for resources but the task is ready to execute + tracker *workTracker } +func (t *trackedWorker) start() { + close(t.execute) +} + func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { - return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1, func() (storiface.CallID, error) { return t.Worker.SealPreCommit1(ctx, sector, ticket, pieces) }) } func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) { - return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2, func() (storiface.CallID, error) { return t.Worker.SealPreCommit2(ctx, sector, pc1o) }) } func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { - return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit1, func() (storiface.CallID, error) { return t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids) }) } func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) { - return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit2, func() (storiface.CallID, error) { return t.Worker.SealCommit2(ctx, sector, c1o) }) } func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) { - return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) }) } func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { - return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece, func() (storiface.CallID, error) { + return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData) + }) } func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { - return t.tracker.track(ctx, t.wid, t.workerInfo, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, s, sealtasks.TTFetch, func() (storiface.CallID, error) { return t.Worker.Fetch(ctx, s, ft, ptype, am) }) } func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { - return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, id, sealtasks.TTUnseal, func() (storiface.CallID, error) { return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid) }) } var _ Worker = &trackedWorker{}