diff --git a/manager.go b/manager.go index fc3be18c1..063456fa9 100644 --- a/manager.go +++ b/manager.go @@ -166,7 +166,10 @@ func (m *Manager) AddWorker(ctx context.Context, w Worker) error { } m.sched.newWorkers <- &workerHandle{ - w: w, + w: w, + wt: &workTracker{ + running: map[uint64]storiface.WorkerJob{}, + }, info: info, preparing: &activeResources{}, active: &activeResources{}, diff --git a/sched.go b/sched.go index bec5ee0c5..ed48d097b 100644 --- a/sched.go +++ b/sched.go @@ -82,6 +82,9 @@ type workerHandle struct { preparing *activeResources active *activeResources + // stats / tracking + wt *workTracker + // for sync manager goroutine closing cleanupStarted bool closedMgr chan struct{} @@ -486,7 +489,7 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke w.preparing.add(w.info.Resources, needRes) go func() { - err := req.prepare(req.ctx, w.w) + err := req.prepare(req.ctx, w.wt.worker(w.w)) sh.workersLk.Lock() if err != nil { @@ -519,7 +522,7 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke case <-sh.closing: } - err = req.work(req.ctx, w.w) + err = req.work(req.ctx, w.wt.worker(w.w)) select { case req.ret <- workerResponse{err: err}: diff --git a/sched_test.go b/sched_test.go index 67a5eeed3..caf7f0b4b 100644 --- a/sched_test.go +++ b/sched_test.go @@ -156,7 +156,10 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str require.NoError(t, err) sched.newWorkers <- &workerHandle{ - w: w, + w: w, + wt: &workTracker{ + running: map[uint64]storiface.WorkerJob{}, + }, info: info, preparing: &activeResources{}, active: &activeResources{}, diff --git a/sealtasks/task.go b/sealtasks/task.go index 978107c85..ad5ce01bb 100644 --- a/sealtasks/task.go +++ b/sealtasks/task.go @@ -28,6 +28,30 @@ var order = map[TaskType]int{ TTReadUnsealed: 0, } +var shortNames = map[TaskType]string{ + TTAddPiece: "AP ", + + TTPreCommit1: "PC1", + TTPreCommit2: "PC2", + TTCommit1: "C1 ", + TTCommit2: "C2 ", + + TTFinalize: "FIN", + + TTFetch: "GET", + TTUnseal: "UNS", + TTReadUnsealed: "RD ", +} + func (a TaskType) Less(b TaskType) bool { return order[a] < order[b] } + +func (a TaskType) Short() string { + n, ok := shortNames[a] + if !ok { + return "UNK" + } + + return n +} diff --git a/stats.go b/stats.go index dbbee07f3..ee88898a4 100644 --- a/stats.go +++ b/stats.go @@ -20,3 +20,16 @@ func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats { return out } + +func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob { + m.sched.workersLk.Lock() + defer m.sched.workersLk.Unlock() + + out := map[uint64][]storiface.WorkerJob{} + + for id, handle := range m.sched.workers { + out[uint64(id)] = handle.wt.Running() + } + + return out +} diff --git a/storiface/worker.go b/storiface/worker.go index 0f49e8971..01ef59d36 100644 --- a/storiface/worker.go +++ b/storiface/worker.go @@ -1,5 +1,12 @@ package storiface +import ( + "time" + + "github.com/filecoin-project/sector-storage/sealtasks" + "github.com/filecoin-project/specs-actors/actors/abi" +) + type WorkerInfo struct { Hostname string @@ -24,3 +31,11 @@ type WorkerStats struct { GpuUsed bool CpuUse uint64 } + +type WorkerJob struct { + ID uint64 + Sector abi.SectorID + Task sealtasks.TaskType + + Start time.Time +} diff --git a/work_tracker.go b/work_tracker.go new file mode 100644 index 000000000..f1e243ed2 --- /dev/null +++ b/work_tracker.go @@ -0,0 +1,129 @@ +package sectorstorage + +import ( + "context" + "io" + "sync" + "time" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-storage/storage" + + "github.com/filecoin-project/sector-storage/sealtasks" + "github.com/filecoin-project/sector-storage/stores" + "github.com/filecoin-project/sector-storage/storiface" +) + +type workTracker struct { + lk sync.Mutex + + ctr uint64 + running map[uint64]storiface.WorkerJob + + // TODO: done, aggregate stats, queue stats, scheduler feedback +} + +func (wt *workTracker) track(sid abi.SectorID, task sealtasks.TaskType) func() { + wt.lk.Lock() + defer wt.lk.Unlock() + + id := wt.ctr + wt.ctr++ + + wt.running[id] = storiface.WorkerJob{ + ID: id, + Sector: sid, + Task: task, + Start: time.Now(), + } + + return func() { + wt.lk.Lock() + defer wt.lk.Unlock() + + delete(wt.running, id) + } +} + +func (wt *workTracker) worker(w Worker) Worker { + return &trackedWorker{ + Worker: w, + tracker: wt, + } +} + +func (wt *workTracker) Running() []storiface.WorkerJob { + wt.lk.Lock() + defer wt.lk.Unlock() + + out := make([]storiface.WorkerJob, 0, len(wt.running)) + for _, job := range wt.running { + out = append(out, job) + } + + return out +} + +type trackedWorker struct { + Worker + + tracker *workTracker +} + +func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) { + defer t.tracker.track(sector, sealtasks.TTPreCommit1)() + + return t.Worker.SealPreCommit1(ctx, sector, ticket, pieces) +} + +func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) { + defer t.tracker.track(sector, sealtasks.TTPreCommit2)() + + return t.Worker.SealPreCommit2(ctx, sector, pc1o) +} + +func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) { + defer t.tracker.track(sector, sealtasks.TTCommit1)() + + return t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids) +} + +func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) { + defer t.tracker.track(sector, sealtasks.TTCommit2)() + + return t.Worker.SealCommit2(ctx, sector, c1o) +} + +func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error { + defer t.tracker.track(sector, sealtasks.TTFinalize)() + + return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) +} + +func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { + defer t.tracker.track(sector, sealtasks.TTAddPiece)() + + return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData) +} + +func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error { + defer t.tracker.track(s, sealtasks.TTFetch)() + + return t.Worker.Fetch(ctx, s, ft, ptype, am) +} + +func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error { + defer t.tracker.track(id, sealtasks.TTUnseal)() + + return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid) +} + +func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { + defer t.tracker.track(id, sealtasks.TTReadUnsealed)() + + return t.Worker.ReadPiece(ctx, writer, id, index, size) +} + +var _ Worker = &trackedWorker{}