Add api to get active tasks
This commit is contained in:
parent
a109ef9cbe
commit
c7da20e53c
@ -166,7 +166,10 @@ func (m *Manager) AddWorker(ctx context.Context, w Worker) error {
|
||||
}
|
||||
|
||||
m.sched.newWorkers <- &workerHandle{
|
||||
w: w,
|
||||
w: w,
|
||||
wt: &workTracker{
|
||||
running: map[uint64]storiface.WorkerJob{},
|
||||
},
|
||||
info: info,
|
||||
preparing: &activeResources{},
|
||||
active: &activeResources{},
|
||||
|
7
sched.go
7
sched.go
@ -82,6 +82,9 @@ type workerHandle struct {
|
||||
preparing *activeResources
|
||||
active *activeResources
|
||||
|
||||
// stats / tracking
|
||||
wt *workTracker
|
||||
|
||||
// for sync manager goroutine closing
|
||||
cleanupStarted bool
|
||||
closedMgr chan struct{}
|
||||
@ -486,7 +489,7 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
|
||||
w.preparing.add(w.info.Resources, needRes)
|
||||
|
||||
go func() {
|
||||
err := req.prepare(req.ctx, w.w)
|
||||
err := req.prepare(req.ctx, w.wt.worker(w.w))
|
||||
sh.workersLk.Lock()
|
||||
|
||||
if err != nil {
|
||||
@ -519,7 +522,7 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
|
||||
case <-sh.closing:
|
||||
}
|
||||
|
||||
err = req.work(req.ctx, w.w)
|
||||
err = req.work(req.ctx, w.wt.worker(w.w))
|
||||
|
||||
select {
|
||||
case req.ret <- workerResponse{err: err}:
|
||||
|
@ -156,7 +156,10 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str
|
||||
require.NoError(t, err)
|
||||
|
||||
sched.newWorkers <- &workerHandle{
|
||||
w: w,
|
||||
w: w,
|
||||
wt: &workTracker{
|
||||
running: map[uint64]storiface.WorkerJob{},
|
||||
},
|
||||
info: info,
|
||||
preparing: &activeResources{},
|
||||
active: &activeResources{},
|
||||
|
@ -28,6 +28,30 @@ var order = map[TaskType]int{
|
||||
TTReadUnsealed: 0,
|
||||
}
|
||||
|
||||
var shortNames = map[TaskType]string{
|
||||
TTAddPiece: "AP ",
|
||||
|
||||
TTPreCommit1: "PC1",
|
||||
TTPreCommit2: "PC2",
|
||||
TTCommit1: "C1 ",
|
||||
TTCommit2: "C2 ",
|
||||
|
||||
TTFinalize: "FIN",
|
||||
|
||||
TTFetch: "GET",
|
||||
TTUnseal: "UNS",
|
||||
TTReadUnsealed: "RD ",
|
||||
}
|
||||
|
||||
func (a TaskType) Less(b TaskType) bool {
|
||||
return order[a] < order[b]
|
||||
}
|
||||
|
||||
func (a TaskType) Short() string {
|
||||
n, ok := shortNames[a]
|
||||
if !ok {
|
||||
return "UNK"
|
||||
}
|
||||
|
||||
return n
|
||||
}
|
||||
|
13
stats.go
13
stats.go
@ -20,3 +20,16 @@ func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats {
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob {
|
||||
m.sched.workersLk.Lock()
|
||||
defer m.sched.workersLk.Unlock()
|
||||
|
||||
out := map[uint64][]storiface.WorkerJob{}
|
||||
|
||||
for id, handle := range m.sched.workers {
|
||||
out[uint64(id)] = handle.wt.Running()
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
@ -1,5 +1,12 @@
|
||||
package storiface
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/sector-storage/sealtasks"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
)
|
||||
|
||||
type WorkerInfo struct {
|
||||
Hostname string
|
||||
|
||||
@ -24,3 +31,11 @@ type WorkerStats struct {
|
||||
GpuUsed bool
|
||||
CpuUse uint64
|
||||
}
|
||||
|
||||
type WorkerJob struct {
|
||||
ID uint64
|
||||
Sector abi.SectorID
|
||||
Task sealtasks.TaskType
|
||||
|
||||
Start time.Time
|
||||
}
|
||||
|
129
work_tracker.go
Normal file
129
work_tracker.go
Normal file
@ -0,0 +1,129 @@
|
||||
package sectorstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/sector-storage/sealtasks"
|
||||
"github.com/filecoin-project/sector-storage/stores"
|
||||
"github.com/filecoin-project/sector-storage/storiface"
|
||||
)
|
||||
|
||||
type workTracker struct {
|
||||
lk sync.Mutex
|
||||
|
||||
ctr uint64
|
||||
running map[uint64]storiface.WorkerJob
|
||||
|
||||
// TODO: done, aggregate stats, queue stats, scheduler feedback
|
||||
}
|
||||
|
||||
func (wt *workTracker) track(sid abi.SectorID, task sealtasks.TaskType) func() {
|
||||
wt.lk.Lock()
|
||||
defer wt.lk.Unlock()
|
||||
|
||||
id := wt.ctr
|
||||
wt.ctr++
|
||||
|
||||
wt.running[id] = storiface.WorkerJob{
|
||||
ID: id,
|
||||
Sector: sid,
|
||||
Task: task,
|
||||
Start: time.Now(),
|
||||
}
|
||||
|
||||
return func() {
|
||||
wt.lk.Lock()
|
||||
defer wt.lk.Unlock()
|
||||
|
||||
delete(wt.running, id)
|
||||
}
|
||||
}
|
||||
|
||||
func (wt *workTracker) worker(w Worker) Worker {
|
||||
return &trackedWorker{
|
||||
Worker: w,
|
||||
tracker: wt,
|
||||
}
|
||||
}
|
||||
|
||||
func (wt *workTracker) Running() []storiface.WorkerJob {
|
||||
wt.lk.Lock()
|
||||
defer wt.lk.Unlock()
|
||||
|
||||
out := make([]storiface.WorkerJob, 0, len(wt.running))
|
||||
for _, job := range wt.running {
|
||||
out = append(out, job)
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
type trackedWorker struct {
|
||||
Worker
|
||||
|
||||
tracker *workTracker
|
||||
}
|
||||
|
||||
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
|
||||
defer t.tracker.track(sector, sealtasks.TTPreCommit1)()
|
||||
|
||||
return t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)
|
||||
}
|
||||
|
||||
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) {
|
||||
defer t.tracker.track(sector, sealtasks.TTPreCommit2)()
|
||||
|
||||
return t.Worker.SealPreCommit2(ctx, sector, pc1o)
|
||||
}
|
||||
|
||||
func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
|
||||
defer t.tracker.track(sector, sealtasks.TTCommit1)()
|
||||
|
||||
return t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
|
||||
}
|
||||
|
||||
func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) {
|
||||
defer t.tracker.track(sector, sealtasks.TTCommit2)()
|
||||
|
||||
return t.Worker.SealCommit2(ctx, sector, c1o)
|
||||
}
|
||||
|
||||
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
|
||||
defer t.tracker.track(sector, sealtasks.TTFinalize)()
|
||||
|
||||
return t.Worker.FinalizeSector(ctx, sector, keepUnsealed)
|
||||
}
|
||||
|
||||
func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
|
||||
defer t.tracker.track(sector, sealtasks.TTAddPiece)()
|
||||
|
||||
return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
|
||||
}
|
||||
|
||||
func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error {
|
||||
defer t.tracker.track(s, sealtasks.TTFetch)()
|
||||
|
||||
return t.Worker.Fetch(ctx, s, ft, ptype, am)
|
||||
}
|
||||
|
||||
func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error {
|
||||
defer t.tracker.track(id, sealtasks.TTUnseal)()
|
||||
|
||||
return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)
|
||||
}
|
||||
|
||||
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
|
||||
defer t.tracker.track(id, sealtasks.TTReadUnsealed)()
|
||||
|
||||
return t.Worker.ReadPiece(ctx, writer, id, index, size)
|
||||
}
|
||||
|
||||
var _ Worker = &trackedWorker{}
|
Loading…
Reference in New Issue
Block a user