130 lines
3.8 KiB
Go
130 lines
3.8 KiB
Go
package sectorstorage
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
|
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
|
"github.com/filecoin-project/specs-storage/storage"
|
|
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
|
)
|
|
|
|
type workTracker struct {
|
|
lk sync.Mutex
|
|
|
|
ctr uint64
|
|
running map[uint64]storiface.WorkerJob
|
|
|
|
// TODO: done, aggregate stats, queue stats, scheduler feedback
|
|
}
|
|
|
|
func (wt *workTracker) track(sid abi.SectorID, task sealtasks.TaskType) func() {
|
|
wt.lk.Lock()
|
|
defer wt.lk.Unlock()
|
|
|
|
id := wt.ctr
|
|
wt.ctr++
|
|
|
|
wt.running[id] = storiface.WorkerJob{
|
|
ID: id,
|
|
Sector: sid,
|
|
Task: task,
|
|
Start: time.Now(),
|
|
}
|
|
|
|
return func() {
|
|
wt.lk.Lock()
|
|
defer wt.lk.Unlock()
|
|
|
|
delete(wt.running, id)
|
|
}
|
|
}
|
|
|
|
func (wt *workTracker) worker(w Worker) Worker {
|
|
return &trackedWorker{
|
|
Worker: w,
|
|
tracker: wt,
|
|
}
|
|
}
|
|
|
|
func (wt *workTracker) Running() []storiface.WorkerJob {
|
|
wt.lk.Lock()
|
|
defer wt.lk.Unlock()
|
|
|
|
out := make([]storiface.WorkerJob, 0, len(wt.running))
|
|
for _, job := range wt.running {
|
|
out = append(out, job)
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
type trackedWorker struct {
|
|
Worker
|
|
|
|
tracker *workTracker
|
|
}
|
|
|
|
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
|
|
defer t.tracker.track(sector, sealtasks.TTPreCommit1)()
|
|
|
|
return t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)
|
|
}
|
|
|
|
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) {
|
|
defer t.tracker.track(sector, sealtasks.TTPreCommit2)()
|
|
|
|
return t.Worker.SealPreCommit2(ctx, sector, pc1o)
|
|
}
|
|
|
|
func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
|
|
defer t.tracker.track(sector, sealtasks.TTCommit1)()
|
|
|
|
return t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
|
|
}
|
|
|
|
func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) {
|
|
defer t.tracker.track(sector, sealtasks.TTCommit2)()
|
|
|
|
return t.Worker.SealCommit2(ctx, sector, c1o)
|
|
}
|
|
|
|
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
|
|
defer t.tracker.track(sector, sealtasks.TTFinalize)()
|
|
|
|
return t.Worker.FinalizeSector(ctx, sector, keepUnsealed)
|
|
}
|
|
|
|
func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
|
|
defer t.tracker.track(sector, sealtasks.TTAddPiece)()
|
|
|
|
return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
|
|
}
|
|
|
|
func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error {
|
|
defer t.tracker.track(s, sealtasks.TTFetch)()
|
|
|
|
return t.Worker.Fetch(ctx, s, ft, ptype, am)
|
|
}
|
|
|
|
func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error {
|
|
defer t.tracker.track(id, sealtasks.TTUnseal)()
|
|
|
|
return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)
|
|
}
|
|
|
|
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
|
defer t.tracker.track(id, sealtasks.TTReadUnsealed)()
|
|
|
|
return t.Worker.ReadPiece(ctx, writer, id, index, size)
|
|
}
|
|
|
|
var _ Worker = &trackedWorker{}
|