sectorstorage: fix work tracking

This commit is contained in:
Łukasz Magiera 2020-09-23 14:56:37 +02:00
parent ce6b92484f
commit 86c222ab58
7 changed files with 53 additions and 45 deletions

View File

@ -197,10 +197,7 @@ func (m *Manager) AddWorker(ctx context.Context, w Worker) error {
m.sched.newWorkers <- &workerHandle{ m.sched.newWorkers <- &workerHandle{
w: w, w: w,
wt: &workTracker{
done: map[storiface.CallID]struct{}{},
running: map[storiface.CallID]storiface.WorkerJob{},
},
info: info, info: info,
preparing: &activeResources{}, preparing: &activeResources{},
active: &activeResources{}, active: &activeResources{},

View File

@ -289,6 +289,8 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr stri
err: err, err: err,
} }
m.sched.wt.onDone(callID)
m.workLk.Lock() m.workLk.Lock()
defer m.workLk.Unlock() defer m.workLk.Unlock()

View File

@ -115,6 +115,11 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man
Prover: prover, Prover: prover,
wt: &workTracker{
done: map[storiface.CallID]struct{}{},
running: map[storiface.CallID]trackedWork{},
},
work: statestore.New(ds), work: statestore.New(ds),
callToWork: map[storiface.CallID]WorkID{}, callToWork: map[storiface.CallID]WorkID{},
callRes: map[storiface.CallID]chan result{}, callRes: map[storiface.CallID]chan result{},

View File

@ -69,6 +69,8 @@ type scheduler struct {
schedQueue *requestQueue schedQueue *requestQueue
openWindows []*schedWindowRequest openWindows []*schedWindowRequest
wt *workTracker
info chan func(interface{}) info chan func(interface{})
closing chan struct{} closing chan struct{}
@ -89,9 +91,6 @@ type workerHandle struct {
wndLk sync.Mutex wndLk sync.Mutex
activeWindows []*schedWindow activeWindows []*schedWindow
// stats / tracking
wt *workTracker
// for sync manager goroutine closing // for sync manager goroutine closing
cleanupStarted bool cleanupStarted bool
closedMgr chan struct{} closedMgr chan struct{}
@ -157,6 +156,11 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler {
schedQueue: &requestQueue{}, schedQueue: &requestQueue{},
wt: &workTracker{
done: map[storiface.CallID]struct{}{},
running: map[storiface.CallID]trackedWork{},
},
info: make(chan func(interface{})), info: make(chan func(interface{})),
closing: make(chan struct{}), closing: make(chan struct{}),
@ -680,7 +684,7 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
w.lk.Unlock() w.lk.Unlock()
go func() { go func() {
err := req.prepare(req.ctx, w.wt.worker(w.w)) err := req.prepare(req.ctx, sh.wt.worker(wid, w.w))
sh.workersLk.Lock() sh.workersLk.Lock()
if err != nil { if err != nil {
@ -717,7 +721,7 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
case <-sh.closing: case <-sh.closing:
} }
err = req.work(req.ctx, w.wt.worker(w.w)) err = req.work(req.ctx, sh.wt.worker(wid, w.w))
select { select {
case req.ret <- workerResponse{err: err}: case req.ret <- workerResponse{err: err}:

View File

@ -165,10 +165,7 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str
sched.newWorkers <- &workerHandle{ sched.newWorkers <- &workerHandle{
w: w, w: w,
wt: &workTracker{
done: map[storiface.CallID]struct{}{},
running: map[storiface.CallID]storiface.WorkerJob{},
},
info: info, info: info,
preparing: &activeResources{}, preparing: &activeResources{},
active: &activeResources{}, active: &activeResources{},

View File

@ -29,9 +29,11 @@ func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob {
out := map[uint64][]storiface.WorkerJob{} out := map[uint64][]storiface.WorkerJob{}
for id, handle := range m.sched.workers { for _, t := range m.sched.wt.Running() {
out[uint64(id)] = handle.wt.Running() out[uint64(t.worker)] = append(out[uint64(t.worker)], t.job)
}
for id, handle := range m.sched.workers {
handle.wndLk.Lock() handle.wndLk.Lock()
for wi, window := range handle.activeWindows { for wi, window := range handle.activeWindows {
for _, request := range window.todo { for _, request := range window.todo {

View File

@ -15,25 +15,20 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
) )
type trackedWork struct {
job storiface.WorkerJob
worker WorkerID
}
type workTracker struct { type workTracker struct {
lk sync.Mutex lk sync.Mutex
done map[storiface.CallID]struct{} done map[storiface.CallID]struct{}
running map[storiface.CallID]storiface.WorkerJob running map[storiface.CallID]trackedWork
// TODO: done, aggregate stats, queue stats, scheduler feedback // TODO: done, aggregate stats, queue stats, scheduler feedback
} }
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
func (wt *workTracker) onDone(callID storiface.CallID) { func (wt *workTracker) onDone(callID storiface.CallID) {
wt.lk.Lock() wt.lk.Lock()
defer wt.lk.Unlock() defer wt.lk.Unlock()
@ -47,7 +42,7 @@ func (wt *workTracker) onDone(callID storiface.CallID) {
delete(wt.running, callID) delete(wt.running, callID)
} }
func (wt *workTracker) track(sid abi.SectorID, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) { func (wt *workTracker) track(wid WorkerID, sid abi.SectorID, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
return func(callID storiface.CallID, err error) (storiface.CallID, error) { return func(callID storiface.CallID, err error) (storiface.CallID, error) {
if err != nil { if err != nil {
return callID, err return callID, err
@ -62,29 +57,34 @@ func (wt *workTracker) track(sid abi.SectorID, task sealtasks.TaskType) func(sto
return callID, err return callID, err
} }
wt.running[callID] = storiface.WorkerJob{ wt.running[callID] = trackedWork{
ID: callID, job: storiface.WorkerJob{
Sector: sid, ID: callID,
Task: task, Sector: sid,
Start: time.Now(), Task: task,
Start: time.Now(),
},
worker: wid,
} }
return callID, err return callID, err
} }
} }
func (wt *workTracker) worker(w Worker) Worker { func (wt *workTracker) worker(wid WorkerID, w Worker) Worker {
return &trackedWorker{ return &trackedWorker{
Worker: w, Worker: w,
wid: wid,
tracker: wt, tracker: wt,
} }
} }
func (wt *workTracker) Running() []storiface.WorkerJob { func (wt *workTracker) Running() []trackedWork {
wt.lk.Lock() wt.lk.Lock()
defer wt.lk.Unlock() defer wt.lk.Unlock()
out := make([]storiface.WorkerJob, 0, len(wt.running)) out := make([]trackedWork, 0, len(wt.running))
for _, job := range wt.running { for _, job := range wt.running {
out = append(out, job) out = append(out, job)
} }
@ -94,44 +94,45 @@ func (wt *workTracker) Running() []storiface.WorkerJob {
type trackedWorker struct { type trackedWorker struct {
Worker Worker
wid WorkerID
tracker *workTracker tracker *workTracker
} }
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.tracker.track(sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)) return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces))
} }
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) { func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
return t.tracker.track(sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o)) return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o))
} }
func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
return t.tracker.track(sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) return t.tracker.track(t.wid, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
} }
func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) { func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) {
return t.tracker.track(sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o)) return t.tracker.track(t.wid, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o))
} }
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) { func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) {
return t.tracker.track(sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed)) return t.tracker.track(t.wid, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed))
} }
func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.tracker.track(sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)) return t.tracker.track(t.wid, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData))
} }
func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.tracker.track(s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am)) return t.tracker.track(t.wid, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am))
} }
func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
return t.tracker.track(id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)) return t.tracker.track(t.wid, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
} }
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) { func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
return t.tracker.track(id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size)) return t.tracker.track(t.wid, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size))
} }
var _ Worker = &trackedWorker{} var _ Worker = &trackedWorker{}