lotus/storage/sealer/worker_tracked.go

234 lines
8.3 KiB
Go
Raw Normal View History

package sealer
2020-07-21 18:01:25 +00:00
import (
"context"
"sync"
"time"
"github.com/google/uuid"
2020-07-21 18:01:25 +00:00
"github.com/ipfs/go-cid"
2021-02-21 10:03:00 +00:00
"go.opencensus.io/stats"
"go.opencensus.io/tag"
2020-07-21 18:01:25 +00:00
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2020-07-21 18:01:25 +00:00
"github.com/filecoin-project/specs-storage/storage"
2021-02-21 10:03:00 +00:00
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
2020-07-21 18:01:25 +00:00
)
2020-09-23 12:56:37 +00:00
type trackedWork struct {
2021-02-21 10:03:00 +00:00
job storiface.WorkerJob
2021-11-29 13:42:20 +00:00
worker storiface.WorkerID
2021-02-21 10:03:00 +00:00
workerHostname string
2020-09-23 12:56:37 +00:00
}
2020-07-21 18:01:25 +00:00
type workTracker struct {
lk sync.Mutex
2021-10-15 19:04:03 +00:00
done map[storiface.CallID]struct{}
running map[storiface.CallID]trackedWork
prepared map[uuid.UUID]trackedWork
2020-07-21 18:01:25 +00:00
// TODO: done, aggregate stats, queue stats, scheduler feedback
}
2021-02-21 10:03:00 +00:00
func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) {
2020-07-21 18:01:25 +00:00
wt.lk.Lock()
defer wt.lk.Unlock()
2021-02-21 10:03:00 +00:00
t, ok := wt.running[callID]
2020-09-07 14:12:46 +00:00
if !ok {
wt.done[callID] = struct{}{}
2021-02-21 10:03:00 +00:00
stats.Record(ctx, metrics.WorkerUntrackedCallsReturned.M(1))
2020-09-07 14:12:46 +00:00
return
2020-07-21 18:01:25 +00:00
}
2021-02-21 10:03:00 +00:00
took := metrics.SinceInMilliseconds(t.job.Start)
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.TaskType, string(t.job.Task)),
tag.Upsert(metrics.WorkerHostname, t.workerHostname),
)
stats.Record(ctx, metrics.WorkerCallsReturnedCount.M(1), metrics.WorkerCallsReturnedDuration.M(took))
2020-09-07 14:12:46 +00:00
delete(wt.running, callID)
}
2021-11-29 13:42:20 +00:00
func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid storiface.WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType, cb func() (storiface.CallID, error)) (storiface.CallID, error) {
tracked := func(rw int, callID storiface.CallID) trackedWork {
return trackedWork{
job: storiface.WorkerJob{
ID: callID,
Sector: sid.ID,
Task: task,
Start: time.Now(),
RunWait: rw,
},
worker: wid,
workerHostname: wi.Hostname,
2020-09-07 14:12:46 +00:00
}
}
2020-09-07 14:12:46 +00:00
2021-10-18 18:19:21 +00:00
wt.lk.Lock()
defer wt.lk.Unlock()
select {
case <-ready:
case <-ctx.Done():
return storiface.UndefCall, ctx.Err()
default:
prepID := uuid.New()
2020-07-21 18:01:25 +00:00
wt.prepared[prepID] = tracked(storiface.RWPrepared, storiface.UndefCall)
2020-09-07 14:12:46 +00:00
wt.lk.Unlock()
2021-10-18 18:19:21 +00:00
2021-10-15 19:04:03 +00:00
select {
case <-ready:
case <-ctx.Done():
wt.lk.Lock()
delete(wt.prepared, prepID)
return storiface.UndefCall, ctx.Err()
2021-10-15 19:04:03 +00:00
}
wt.lk.Lock()
delete(wt.prepared, prepID)
}
wt.lk.Unlock()
callID, err := cb()
wt.lk.Lock()
if err != nil {
return callID, err
}
2021-10-15 19:04:03 +00:00
_, done := wt.done[callID]
if done {
delete(wt.done, callID)
2020-09-07 14:12:46 +00:00
return callID, err
2020-07-21 18:01:25 +00:00
}
wt.running[callID] = tracked(storiface.RWRunning, callID)
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.TaskType, string(task)),
tag.Upsert(metrics.WorkerHostname, wi.Hostname),
)
stats.Record(ctx, metrics.WorkerCallsStarted.M(1))
return callID, err
2020-07-21 18:01:25 +00:00
}
2021-11-29 13:42:20 +00:00
func (wt *workTracker) worker(wid storiface.WorkerID, wi storiface.WorkerInfo, w Worker) *trackedWorker {
2020-07-21 18:01:25 +00:00
return &trackedWorker{
2021-02-21 10:03:00 +00:00
Worker: w,
wid: wid,
workerInfo: wi,
2020-09-23 12:56:37 +00:00
2021-10-15 19:04:03 +00:00
execute: make(chan struct{}),
2020-07-21 18:01:25 +00:00
tracker: wt,
}
}
2021-10-15 19:26:35 +00:00
func (wt *workTracker) Running() ([]trackedWork, []trackedWork) {
2020-07-21 18:01:25 +00:00
wt.lk.Lock()
defer wt.lk.Unlock()
2021-10-15 19:26:35 +00:00
running := make([]trackedWork, 0, len(wt.running))
2020-07-21 18:01:25 +00:00
for _, job := range wt.running {
2021-10-15 19:26:35 +00:00
running = append(running, job)
}
prepared := make([]trackedWork, 0, len(wt.prepared))
for _, job := range wt.prepared {
prepared = append(prepared, job)
2020-07-21 18:01:25 +00:00
}
2021-10-15 19:26:35 +00:00
return running, prepared
2020-07-21 18:01:25 +00:00
}
type trackedWorker struct {
Worker
2021-11-29 13:42:20 +00:00
wid storiface.WorkerID
2021-02-21 10:03:00 +00:00
workerInfo storiface.WorkerInfo
2020-07-21 18:01:25 +00:00
2021-10-15 19:04:03 +00:00
execute chan struct{} // channel blocking execution in case we're waiting for resources but the task is ready to execute
2020-07-21 18:01:25 +00:00
tracker *workTracker
}
2021-10-15 19:04:03 +00:00
func (t *trackedWorker) start() {
close(t.execute)
}
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1, func() (storiface.CallID, error) { return t.Worker.SealPreCommit1(ctx, sector, ticket, pieces) })
2020-07-21 18:01:25 +00:00
}
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2, func() (storiface.CallID, error) { return t.Worker.SealPreCommit2(ctx, sector, pc1o) })
2020-07-21 18:01:25 +00:00
}
func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit1, func() (storiface.CallID, error) {
return t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
})
2020-07-21 18:01:25 +00:00
}
func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit2, func() (storiface.CallID, error) { return t.Worker.SealCommit2(ctx, sector, c1o) })
2020-07-21 18:01:25 +00:00
}
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) })
2020-07-21 18:01:25 +00:00
}
2022-04-26 16:22:52 +00:00
func (t *trackedWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, storage.NoSectorRef, sealtasks.TTDataCid, func() (storiface.CallID, error) {
return t.Worker.DataCid(ctx, pieceSize, pieceData)
})
}
func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece, func() (storiface.CallID, error) {
return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
})
2020-07-21 18:01:25 +00:00
}
func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, s, sealtasks.TTFetch, func() (storiface.CallID, error) { return t.Worker.Fetch(ctx, s, ft, ptype, am) })
2020-07-21 18:01:25 +00:00
}
func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, id, sealtasks.TTUnseal, func() (storiface.CallID, error) { return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid) })
2020-07-21 18:01:25 +00:00
}
func (t *trackedWorker) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTReplicaUpdate, func() (storiface.CallID, error) {
return t.Worker.ReplicaUpdate(ctx, sector, pieces)
})
}
func (t *trackedWorker) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTProveReplicaUpdate1, func() (storiface.CallID, error) {
return t.Worker.ProveReplicaUpdate1(ctx, sector, sectorKey, newSealed, newUnsealed)
})
}
func (t *trackedWorker) ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTProveReplicaUpdate2, func() (storiface.CallID, error) {
return t.Worker.ProveReplicaUpdate2(ctx, sector, sectorKey, newSealed, newUnsealed, vanillaProofs)
})
}
func (t *trackedWorker) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalizeReplicaUpdate, func() (storiface.CallID, error) { return t.Worker.FinalizeReplicaUpdate(ctx, sector, keepUnsealed) })
}
2020-07-21 18:01:25 +00:00
var _ Worker = &trackedWorker{}