lotus/extern/sector-storage/worker_tracked.go

138 lines
4.3 KiB
Go
Raw Normal View History

package sectorstorage
2020-07-21 18:01:25 +00:00
import (
"context"
"io"
"sync"
"time"
"github.com/ipfs/go-cid"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2020-07-21 18:01:25 +00:00
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
2020-07-21 18:01:25 +00:00
)
type workTracker struct {
lk sync.Mutex
2020-09-07 14:12:46 +00:00
done map[storiface.CallID]struct{}
running map[storiface.CallID]storiface.WorkerJob
2020-07-21 18:01:25 +00:00
// TODO: done, aggregate stats, queue stats, scheduler feedback
}
2020-09-07 14:12:46 +00:00
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
func (wt *workTracker) onDone(callID storiface.CallID) {
2020-07-21 18:01:25 +00:00
wt.lk.Lock()
defer wt.lk.Unlock()
2020-09-07 14:12:46 +00:00
_, ok := wt.running[callID]
if !ok {
wt.done[callID] = struct{}{}
return
2020-07-21 18:01:25 +00:00
}
2020-09-07 14:12:46 +00:00
delete(wt.running, callID)
}
func (wt *workTracker) track(sid abi.SectorID, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
return func(callID storiface.CallID, err error) (storiface.CallID, error) {
if err != nil {
return callID, err
}
2020-07-21 18:01:25 +00:00
wt.lk.Lock()
defer wt.lk.Unlock()
2020-09-07 14:12:46 +00:00
_, done := wt.done[callID]
if done {
delete(wt.done, callID)
return callID, err
}
wt.running[callID] = storiface.WorkerJob{
ID: callID,
Sector: sid,
Task: task,
Start: time.Now(),
}
return callID, err
2020-07-21 18:01:25 +00:00
}
}
func (wt *workTracker) worker(w Worker) Worker {
return &trackedWorker{
Worker: w,
tracker: wt,
}
}
func (wt *workTracker) Running() []storiface.WorkerJob {
wt.lk.Lock()
defer wt.lk.Unlock()
out := make([]storiface.WorkerJob, 0, len(wt.running))
for _, job := range wt.running {
out = append(out, job)
}
return out
}
type trackedWorker struct {
Worker
tracker *workTracker
}
2020-09-07 14:12:46 +00:00
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.tracker.track(sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces))
2020-07-21 18:01:25 +00:00
}
2020-09-07 14:12:46 +00:00
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
return t.tracker.track(sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o))
2020-07-21 18:01:25 +00:00
}
2020-09-07 14:12:46 +00:00
func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
return t.tracker.track(sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
2020-07-21 18:01:25 +00:00
}
2020-09-07 14:12:46 +00:00
func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) {
return t.tracker.track(sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o))
2020-07-21 18:01:25 +00:00
}
2020-09-07 14:12:46 +00:00
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) {
return t.tracker.track(sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed))
2020-07-21 18:01:25 +00:00
}
2020-09-07 14:12:46 +00:00
func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.tracker.track(sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData))
2020-07-21 18:01:25 +00:00
}
2020-09-07 14:12:46 +00:00
func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.tracker.track(s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am))
2020-07-21 18:01:25 +00:00
}
2020-09-07 14:12:46 +00:00
func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
return t.tracker.track(id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
2020-07-21 18:01:25 +00:00
}
2020-09-07 14:12:46 +00:00
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
return t.tracker.track(id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size))
2020-07-21 18:01:25 +00:00
}
var _ Worker = &trackedWorker{}