2020-08-17 13:26:18 +00:00
|
|
|
package sectorstorage
|
2020-07-21 18:01:25 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"io"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/ipfs/go-cid"
|
2021-02-21 10:03:00 +00:00
|
|
|
"go.opencensus.io/stats"
|
|
|
|
"go.opencensus.io/tag"
|
2020-07-21 18:01:25 +00:00
|
|
|
|
2020-09-07 03:49:10 +00:00
|
|
|
"github.com/filecoin-project/go-state-types/abi"
|
2020-07-21 18:01:25 +00:00
|
|
|
"github.com/filecoin-project/specs-storage/storage"
|
|
|
|
|
2020-08-17 13:26:18 +00:00
|
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
|
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
2021-02-21 10:03:00 +00:00
|
|
|
"github.com/filecoin-project/lotus/metrics"
|
2020-07-21 18:01:25 +00:00
|
|
|
)
|
|
|
|
|
2020-09-23 12:56:37 +00:00
|
|
|
type trackedWork struct {
|
2021-02-21 10:03:00 +00:00
|
|
|
job storiface.WorkerJob
|
|
|
|
worker WorkerID
|
|
|
|
workerHostname string
|
2020-09-23 12:56:37 +00:00
|
|
|
}
|
|
|
|
|
2020-07-21 18:01:25 +00:00
|
|
|
type workTracker struct {
|
|
|
|
lk sync.Mutex
|
|
|
|
|
2020-09-07 14:12:46 +00:00
|
|
|
done map[storiface.CallID]struct{}
|
2020-09-23 12:56:37 +00:00
|
|
|
running map[storiface.CallID]trackedWork
|
2020-07-21 18:01:25 +00:00
|
|
|
|
|
|
|
// TODO: done, aggregate stats, queue stats, scheduler feedback
|
|
|
|
}
|
|
|
|
|
2021-02-21 10:03:00 +00:00
|
|
|
func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) {
|
2020-07-21 18:01:25 +00:00
|
|
|
wt.lk.Lock()
|
|
|
|
defer wt.lk.Unlock()
|
|
|
|
|
2021-02-21 10:03:00 +00:00
|
|
|
t, ok := wt.running[callID]
|
2020-09-07 14:12:46 +00:00
|
|
|
if !ok {
|
|
|
|
wt.done[callID] = struct{}{}
|
2021-02-21 10:03:00 +00:00
|
|
|
|
|
|
|
stats.Record(ctx, metrics.WorkerUntrackedCallsReturned.M(1))
|
2020-09-07 14:12:46 +00:00
|
|
|
return
|
2020-07-21 18:01:25 +00:00
|
|
|
}
|
|
|
|
|
2021-02-21 10:03:00 +00:00
|
|
|
took := metrics.SinceInMilliseconds(t.job.Start)
|
|
|
|
|
|
|
|
ctx, _ = tag.New(
|
|
|
|
ctx,
|
|
|
|
tag.Upsert(metrics.TaskType, string(t.job.Task)),
|
|
|
|
tag.Upsert(metrics.WorkerHostname, t.workerHostname),
|
|
|
|
)
|
|
|
|
stats.Record(ctx, metrics.WorkerCallsReturnedCount.M(1), metrics.WorkerCallsReturnedDuration.M(took))
|
|
|
|
|
2020-09-07 14:12:46 +00:00
|
|
|
delete(wt.running, callID)
|
|
|
|
}
|
|
|
|
|
2021-02-21 10:03:00 +00:00
|
|
|
func (wt *workTracker) track(ctx context.Context, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
|
2020-09-07 14:12:46 +00:00
|
|
|
return func(callID storiface.CallID, err error) (storiface.CallID, error) {
|
|
|
|
if err != nil {
|
|
|
|
return callID, err
|
|
|
|
}
|
|
|
|
|
2020-07-21 18:01:25 +00:00
|
|
|
wt.lk.Lock()
|
|
|
|
defer wt.lk.Unlock()
|
|
|
|
|
2020-09-07 14:12:46 +00:00
|
|
|
_, done := wt.done[callID]
|
|
|
|
if done {
|
|
|
|
delete(wt.done, callID)
|
|
|
|
return callID, err
|
|
|
|
}
|
|
|
|
|
2020-09-23 12:56:37 +00:00
|
|
|
wt.running[callID] = trackedWork{
|
|
|
|
job: storiface.WorkerJob{
|
|
|
|
ID: callID,
|
2020-11-04 20:29:08 +00:00
|
|
|
Sector: sid.ID,
|
2020-09-23 12:56:37 +00:00
|
|
|
Task: task,
|
|
|
|
Start: time.Now(),
|
|
|
|
},
|
2021-02-21 10:03:00 +00:00
|
|
|
worker: wid,
|
|
|
|
workerHostname: wi.Hostname,
|
2020-09-07 14:12:46 +00:00
|
|
|
}
|
|
|
|
|
2021-02-21 10:03:00 +00:00
|
|
|
ctx, _ = tag.New(
|
|
|
|
ctx,
|
|
|
|
tag.Upsert(metrics.TaskType, string(task)),
|
|
|
|
tag.Upsert(metrics.WorkerHostname, wi.Hostname),
|
|
|
|
)
|
|
|
|
stats.Record(ctx, metrics.WorkerCallsStarted.M(1))
|
|
|
|
|
2020-09-07 14:12:46 +00:00
|
|
|
return callID, err
|
2020-07-21 18:01:25 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-21 10:03:00 +00:00
|
|
|
func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) Worker {
|
2020-07-21 18:01:25 +00:00
|
|
|
return &trackedWorker{
|
2021-02-21 10:03:00 +00:00
|
|
|
Worker: w,
|
|
|
|
wid: wid,
|
|
|
|
workerInfo: wi,
|
2020-09-23 12:56:37 +00:00
|
|
|
|
2020-07-21 18:01:25 +00:00
|
|
|
tracker: wt,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-23 12:56:37 +00:00
|
|
|
func (wt *workTracker) Running() []trackedWork {
|
2020-07-21 18:01:25 +00:00
|
|
|
wt.lk.Lock()
|
|
|
|
defer wt.lk.Unlock()
|
|
|
|
|
2020-09-23 12:56:37 +00:00
|
|
|
out := make([]trackedWork, 0, len(wt.running))
|
2020-07-21 18:01:25 +00:00
|
|
|
for _, job := range wt.running {
|
|
|
|
out = append(out, job)
|
|
|
|
}
|
|
|
|
|
|
|
|
return out
|
|
|
|
}
|
|
|
|
|
|
|
|
type trackedWorker struct {
|
|
|
|
Worker
|
2021-02-21 10:03:00 +00:00
|
|
|
wid WorkerID
|
|
|
|
workerInfo storiface.WorkerInfo
|
2020-07-21 18:01:25 +00:00
|
|
|
|
|
|
|
tracker *workTracker
|
|
|
|
}
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
|
2021-02-21 10:03:00 +00:00
|
|
|
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces))
|
2020-07-21 18:01:25 +00:00
|
|
|
}
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
|
2021-02-21 10:03:00 +00:00
|
|
|
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o))
|
2020-07-21 18:01:25 +00:00
|
|
|
}
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
|
2021-02-21 10:03:00 +00:00
|
|
|
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
|
2020-07-21 18:01:25 +00:00
|
|
|
}
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) {
|
2021-02-21 10:03:00 +00:00
|
|
|
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o))
|
2020-07-21 18:01:25 +00:00
|
|
|
}
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
|
2021-02-21 10:03:00 +00:00
|
|
|
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed))
|
2020-07-21 18:01:25 +00:00
|
|
|
}
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
|
2021-02-21 10:03:00 +00:00
|
|
|
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData))
|
2020-07-21 18:01:25 +00:00
|
|
|
}
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
|
2021-02-21 10:03:00 +00:00
|
|
|
return t.tracker.track(ctx, t.wid, t.workerInfo, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am))
|
2020-07-21 18:01:25 +00:00
|
|
|
}
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
|
2021-02-21 10:03:00 +00:00
|
|
|
return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
|
2020-07-21 18:01:25 +00:00
|
|
|
}
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
|
2021-02-21 10:03:00 +00:00
|
|
|
return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size))
|
2020-07-21 18:01:25 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var _ Worker = &trackedWorker{}
|