From 86c222ab58bc875400a3071ddf794274ff0eb322 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 23 Sep 2020 14:56:37 +0200 Subject: [PATCH] sectorstorage: fix work tracking --- extern/sector-storage/manager.go | 5 +- extern/sector-storage/manager_calltracker.go | 2 + extern/sector-storage/manager_test.go | 5 ++ extern/sector-storage/sched.go | 14 +++-- extern/sector-storage/sched_test.go | 5 +- extern/sector-storage/stats.go | 6 +- extern/sector-storage/worker_tracked.go | 61 ++++++++++---------- 7 files changed, 53 insertions(+), 45 deletions(-) diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index eecfd1b55..d4db60806 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -197,10 +197,7 @@ func (m *Manager) AddWorker(ctx context.Context, w Worker) error { m.sched.newWorkers <- &workerHandle{ w: w, - wt: &workTracker{ - done: map[storiface.CallID]struct{}{}, - running: map[storiface.CallID]storiface.WorkerJob{}, - }, + info: info, preparing: &activeResources{}, active: &activeResources{}, diff --git a/extern/sector-storage/manager_calltracker.go b/extern/sector-storage/manager_calltracker.go index 13296a843..e62f964b3 100644 --- a/extern/sector-storage/manager_calltracker.go +++ b/extern/sector-storage/manager_calltracker.go @@ -289,6 +289,8 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr stri err: err, } + m.sched.wt.onDone(callID) + m.workLk.Lock() defer m.workLk.Unlock() diff --git a/extern/sector-storage/manager_test.go b/extern/sector-storage/manager_test.go index d87ec0827..0fada08bc 100644 --- a/extern/sector-storage/manager_test.go +++ b/extern/sector-storage/manager_test.go @@ -115,6 +115,11 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man Prover: prover, + wt: &workTracker{ + done: map[storiface.CallID]struct{}{}, + running: map[storiface.CallID]trackedWork{}, + }, + work: statestore.New(ds), callToWork: map[storiface.CallID]WorkID{}, callRes: map[storiface.CallID]chan result{}, diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index d757140b9..760fe9cba 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -69,6 +69,8 @@ type scheduler struct { schedQueue *requestQueue openWindows []*schedWindowRequest + wt *workTracker + info chan func(interface{}) closing chan struct{} @@ -89,9 +91,6 @@ type workerHandle struct { wndLk sync.Mutex activeWindows []*schedWindow - // stats / tracking - wt *workTracker - // for sync manager goroutine closing cleanupStarted bool closedMgr chan struct{} @@ -157,6 +156,11 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler { schedQueue: &requestQueue{}, + wt: &workTracker{ + done: map[storiface.CallID]struct{}{}, + running: map[storiface.CallID]trackedWork{}, + }, + info: make(chan func(interface{})), closing: make(chan struct{}), @@ -680,7 +684,7 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke w.lk.Unlock() go func() { - err := req.prepare(req.ctx, w.wt.worker(w.w)) + err := req.prepare(req.ctx, sh.wt.worker(wid, w.w)) sh.workersLk.Lock() if err != nil { @@ -717,7 +721,7 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke case <-sh.closing: } - err = req.work(req.ctx, w.wt.worker(w.w)) + err = req.work(req.ctx, sh.wt.worker(wid, w.w)) select { case req.ret <- workerResponse{err: err}: diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 55ef9bf02..f23be20c0 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -165,10 +165,7 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str sched.newWorkers <- &workerHandle{ w: w, - wt: &workTracker{ - done: map[storiface.CallID]struct{}{}, - running: map[storiface.CallID]storiface.WorkerJob{}, - }, + info: info, preparing: &activeResources{}, active: &activeResources{}, diff --git a/extern/sector-storage/stats.go b/extern/sector-storage/stats.go index f9063cbec..1ce415fd2 100644 --- a/extern/sector-storage/stats.go +++ b/extern/sector-storage/stats.go @@ -29,9 +29,11 @@ func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob { out := map[uint64][]storiface.WorkerJob{} - for id, handle := range m.sched.workers { - out[uint64(id)] = handle.wt.Running() + for _, t := range m.sched.wt.Running() { + out[uint64(t.worker)] = append(out[uint64(t.worker)], t.job) + } + for id, handle := range m.sched.workers { handle.wndLk.Lock() for wi, window := range handle.activeWindows { for _, request := range window.todo { diff --git a/extern/sector-storage/worker_tracked.go b/extern/sector-storage/worker_tracked.go index f5ad15360..4a22fcca7 100644 --- a/extern/sector-storage/worker_tracked.go +++ b/extern/sector-storage/worker_tracked.go @@ -15,25 +15,20 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) +type trackedWork struct { + job storiface.WorkerJob + worker WorkerID +} + type workTracker struct { lk sync.Mutex done map[storiface.CallID]struct{} - running map[storiface.CallID]storiface.WorkerJob + running map[storiface.CallID]trackedWork // TODO: done, aggregate stats, queue stats, scheduler feedback } -// TODO: CALL THIS! -// TODO: CALL THIS! -// TODO: CALL THIS! -// TODO: CALL THIS! -// TODO: CALL THIS! -// TODO: CALL THIS! -// TODO: CALL THIS! -// TODO: CALL THIS! -// TODO: CALL THIS! -// TODO: CALL THIS! func (wt *workTracker) onDone(callID storiface.CallID) { wt.lk.Lock() defer wt.lk.Unlock() @@ -47,7 +42,7 @@ func (wt *workTracker) onDone(callID storiface.CallID) { delete(wt.running, callID) } -func (wt *workTracker) track(sid abi.SectorID, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) { +func (wt *workTracker) track(wid WorkerID, sid abi.SectorID, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) { return func(callID storiface.CallID, err error) (storiface.CallID, error) { if err != nil { return callID, err @@ -62,29 +57,34 @@ func (wt *workTracker) track(sid abi.SectorID, task sealtasks.TaskType) func(sto return callID, err } - wt.running[callID] = storiface.WorkerJob{ - ID: callID, - Sector: sid, - Task: task, - Start: time.Now(), + wt.running[callID] = trackedWork{ + job: storiface.WorkerJob{ + ID: callID, + Sector: sid, + Task: task, + Start: time.Now(), + }, + worker: wid, } return callID, err } } -func (wt *workTracker) worker(w Worker) Worker { +func (wt *workTracker) worker(wid WorkerID, w Worker) Worker { return &trackedWorker{ - Worker: w, + Worker: w, + wid: wid, + tracker: wt, } } -func (wt *workTracker) Running() []storiface.WorkerJob { +func (wt *workTracker) Running() []trackedWork { wt.lk.Lock() defer wt.lk.Unlock() - out := make([]storiface.WorkerJob, 0, len(wt.running)) + out := make([]trackedWork, 0, len(wt.running)) for _, job := range wt.running { out = append(out, job) } @@ -94,44 +94,45 @@ func (wt *workTracker) Running() []storiface.WorkerJob { type trackedWorker struct { Worker + wid WorkerID tracker *workTracker } func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { - return t.tracker.track(sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)) + return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)) } func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) { - return t.tracker.track(sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o)) + return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o)) } func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { - return t.tracker.track(sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) + return t.tracker.track(t.wid, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) } func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) { - return t.tracker.track(sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o)) + return t.tracker.track(t.wid, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o)) } func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) { - return t.tracker.track(sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed)) + return t.tracker.track(t.wid, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed)) } func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { - return t.tracker.track(sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)) + return t.tracker.track(t.wid, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)) } func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { - return t.tracker.track(s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am)) + return t.tracker.track(t.wid, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am)) } func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { - return t.tracker.track(id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)) + return t.tracker.track(t.wid, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)) } func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) { - return t.tracker.track(id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size)) + return t.tracker.track(t.wid, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size)) } var _ Worker = &trackedWorker{}