Block work in tracked worker before it is started

This commit is contained in:
Łukasz Magiera 2021-10-18 16:27:25 +02:00
parent 261238e157
commit 70589e4406
2 changed files with 61 additions and 63 deletions

View File

@ -157,7 +157,7 @@ func newScheduler() *scheduler {
workTracker: &workTracker{ workTracker: &workTracker{
done: map[storiface.CallID]struct{}{}, done: map[storiface.CallID]struct{}{},
running: map[storiface.CallID]trackedWork{}, running: map[storiface.CallID]trackedWork{},
prepared: map[storiface.CallID]trackedWork{}, prepared: map[uuid.UUID]trackedWork{},
}, },
info: make(chan func(interface{})), info: make(chan func(interface{})),

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/google/uuid"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.opencensus.io/tag" "go.opencensus.io/tag"
@ -28,7 +29,7 @@ type workTracker struct {
done map[storiface.CallID]struct{} done map[storiface.CallID]struct{}
running map[storiface.CallID]trackedWork running map[storiface.CallID]trackedWork
prepared map[storiface.CallID]trackedWork prepared map[uuid.UUID]trackedWork
// TODO: done, aggregate stats, queue stats, scheduler feedback // TODO: done, aggregate stats, queue stats, scheduler feedback
} }
@ -57,22 +58,8 @@ func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) {
delete(wt.running, callID) delete(wt.running, callID)
} }
func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) { func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType, cb func() (storiface.CallID, error)) (storiface.CallID, error) {
return func(callID storiface.CallID, err error) (storiface.CallID, error) { tracked := func(rw int, callID storiface.CallID) trackedWork {
if err != nil {
return callID, err
}
wt.lk.Lock()
defer wt.lk.Unlock()
_, done := wt.done[callID]
if done {
delete(wt.done, callID)
return callID, err
}
tracked := func(rw int) trackedWork {
return trackedWork{ return trackedWork{
job: storiface.WorkerJob{ job: storiface.WorkerJob{
ID: callID, ID: callID,
@ -89,30 +76,40 @@ func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid Worke
select { select {
case <-ready: case <-ready:
case <-ctx.Done(): case <-ctx.Done():
return callID, ctx.Err() return storiface.UndefCall, ctx.Err()
default: default:
wt.prepared[callID] = tracked(storiface.RWPrepared) prepID := uuid.New()
wt.prepared[prepID] = tracked(storiface.RWPrepared, storiface.UndefCall)
wt.lk.Unlock() wt.lk.Unlock()
select { select {
case <-ready: case <-ready:
case <-ctx.Done(): case <-ctx.Done():
delete(wt.prepared, callID) wt.lk.Lock()
wt.lk.Lock() // for the deferred unlock delete(wt.prepared, prepID)
return callID, ctx.Err() return storiface.UndefCall, ctx.Err()
} }
wt.lk.Lock() wt.lk.Lock()
delete(wt.prepared, prepID)
}
callID, err := cb()
if err != nil {
return callID, err
}
wt.lk.Lock()
defer wt.lk.Unlock()
_, done := wt.done[callID] _, done := wt.done[callID]
if done { if done {
delete(wt.done, callID) delete(wt.done, callID)
return callID, err return callID, err
} }
delete(wt.prepared, callID) wt.running[callID] = tracked(storiface.RWRunning, callID)
}
wt.running[callID] = tracked(storiface.RWRunning)
ctx, _ = tag.New( ctx, _ = tag.New(
ctx, ctx,
@ -123,7 +120,6 @@ func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid Worke
return callID, err return callID, err
} }
}
func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) *trackedWorker { func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) *trackedWorker {
return &trackedWorker{ return &trackedWorker{
@ -168,35 +164,37 @@ func (t *trackedWorker) start() {
} }
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1, func() (storiface.CallID, error) { return t.Worker.SealPreCommit1(ctx, sector, ticket, pieces) })
} }
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) { func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2, func() (storiface.CallID, error) { return t.Worker.SealPreCommit2(ctx, sector, pc1o) })
} }
func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit1, func() (storiface.CallID, error) { return t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids) })
} }
func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) { func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit2, func() (storiface.CallID, error) { return t.Worker.SealCommit2(ctx, sector, c1o) })
} }
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) { func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) })
} }
func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece, func() (storiface.CallID, error) {
return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
})
} }
func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, s, sealtasks.TTFetch, func() (storiface.CallID, error) { return t.Worker.Fetch(ctx, s, ft, ptype, am) })
} }
func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, id, sealtasks.TTUnseal, func() (storiface.CallID, error) { return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid) })
} }
var _ Worker = &trackedWorker{} var _ Worker = &trackedWorker{}