Track prepared work
This commit is contained in:
parent
98ff1c4d20
commit
11d738eee0
5
extern/sector-storage/sched.go
vendored
5
extern/sector-storage/sched.go
vendored
@ -155,8 +155,9 @@ func newScheduler() *scheduler {
|
||||
schedQueue: &requestQueue{},
|
||||
|
||||
workTracker: &workTracker{
|
||||
done: map[storiface.CallID]struct{}{},
|
||||
running: map[storiface.CallID]trackedWork{},
|
||||
done: map[storiface.CallID]struct{}{},
|
||||
running: map[storiface.CallID]trackedWork{},
|
||||
prepared: map[storiface.CallID]trackedWork{},
|
||||
},
|
||||
|
||||
info: make(chan func(interface{})),
|
||||
|
19
extern/sector-storage/sched_worker.go
vendored
19
extern/sector-storage/sched_worker.go
vendored
@ -464,7 +464,9 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
|
||||
|
||||
go func() {
|
||||
// first run the prepare step (e.g. fetching sector data from other worker)
|
||||
err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))
|
||||
tw := sh.workTracker.worker(sw.wid, w.info, w.workerRpc)
|
||||
tw.start()
|
||||
err := req.prepare(req.ctx, tw)
|
||||
w.lk.Lock()
|
||||
|
||||
if err != nil {
|
||||
@ -488,6 +490,14 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
|
||||
return
|
||||
}
|
||||
|
||||
tw = sh.workTracker.worker(sw.wid, w.info, w.workerRpc)
|
||||
|
||||
// start tracking work first early in case we need to wait for resources
|
||||
werr := make(chan error, 1)
|
||||
go func() {
|
||||
werr <- req.work(req.ctx, tw)
|
||||
}()
|
||||
|
||||
// wait (if needed) for resources in the 'active' window
|
||||
err = w.active.withResources(sw.wid, w.info, needRes, &w.lk, func() error {
|
||||
w.preparing.free(w.info.Resources, needRes)
|
||||
@ -501,7 +511,8 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
|
||||
}
|
||||
|
||||
// Do the work!
|
||||
err = req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))
|
||||
tw.start()
|
||||
err = <-werr
|
||||
|
||||
select {
|
||||
case req.ret <- workerResponse{err: err}:
|
||||
@ -534,7 +545,9 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
|
||||
|
||||
go func() {
|
||||
// Do the work!
|
||||
err := req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))
|
||||
tw := sh.workTracker.worker(sw.wid, w.info, w.workerRpc)
|
||||
tw.start()
|
||||
err := req.work(req.ctx, tw)
|
||||
|
||||
select {
|
||||
case req.ret <- workerResponse{err: err}:
|
||||
|
2
extern/sector-storage/stats.go
vendored
2
extern/sector-storage/stats.go
vendored
@ -50,7 +50,7 @@ func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob {
|
||||
ID: storiface.UndefCall,
|
||||
Sector: request.sector.ID,
|
||||
Task: request.taskType,
|
||||
RunWait: wi + 1,
|
||||
RunWait: wi + 2,
|
||||
Start: request.start,
|
||||
})
|
||||
}
|
||||
|
81
extern/sector-storage/worker_tracked.go
vendored
81
extern/sector-storage/worker_tracked.go
vendored
@ -26,8 +26,9 @@ type trackedWork struct {
|
||||
type workTracker struct {
|
||||
lk sync.Mutex
|
||||
|
||||
done map[storiface.CallID]struct{}
|
||||
running map[storiface.CallID]trackedWork
|
||||
done map[storiface.CallID]struct{}
|
||||
running map[storiface.CallID]trackedWork
|
||||
prepared map[storiface.CallID]trackedWork
|
||||
|
||||
// TODO: done, aggregate stats, queue stats, scheduler feedback
|
||||
}
|
||||
@ -56,7 +57,7 @@ func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) {
|
||||
delete(wt.running, callID)
|
||||
}
|
||||
|
||||
func (wt *workTracker) track(ctx context.Context, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
|
||||
func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
|
||||
return func(callID storiface.CallID, err error) (storiface.CallID, error) {
|
||||
if err != nil {
|
||||
return callID, err
|
||||
@ -71,17 +72,47 @@ func (wt *workTracker) track(ctx context.Context, wid WorkerID, wi storiface.Wor
|
||||
return callID, err
|
||||
}
|
||||
|
||||
wt.running[callID] = trackedWork{
|
||||
job: storiface.WorkerJob{
|
||||
ID: callID,
|
||||
Sector: sid.ID,
|
||||
Task: task,
|
||||
Start: time.Now(),
|
||||
},
|
||||
worker: wid,
|
||||
workerHostname: wi.Hostname,
|
||||
tracked := func() trackedWork {
|
||||
return trackedWork{
|
||||
job: storiface.WorkerJob{
|
||||
ID: callID,
|
||||
Sector: sid.ID,
|
||||
Task: task,
|
||||
Start: time.Now(),
|
||||
},
|
||||
worker: wid,
|
||||
workerHostname: wi.Hostname,
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ready:
|
||||
case <-ctx.Done():
|
||||
return callID, ctx.Err()
|
||||
default:
|
||||
wt.prepared[callID] = tracked()
|
||||
|
||||
wt.lk.Unlock()
|
||||
select {
|
||||
case <-ready:
|
||||
case <-ctx.Done():
|
||||
delete(wt.prepared, callID)
|
||||
wt.lk.Lock() // for the deferred unlock
|
||||
return callID, ctx.Err()
|
||||
}
|
||||
|
||||
wt.lk.Lock()
|
||||
_, done := wt.done[callID]
|
||||
if done {
|
||||
delete(wt.done, callID)
|
||||
return callID, err
|
||||
}
|
||||
|
||||
delete(wt.prepared, callID)
|
||||
}
|
||||
|
||||
wt.running[callID] = tracked()
|
||||
|
||||
ctx, _ = tag.New(
|
||||
ctx,
|
||||
tag.Upsert(metrics.TaskType, string(task)),
|
||||
@ -93,12 +124,14 @@ func (wt *workTracker) track(ctx context.Context, wid WorkerID, wi storiface.Wor
|
||||
}
|
||||
}
|
||||
|
||||
func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) Worker {
|
||||
func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) *trackedWorker {
|
||||
return &trackedWorker{
|
||||
Worker: w,
|
||||
wid: wid,
|
||||
workerInfo: wi,
|
||||
|
||||
execute: make(chan struct{}),
|
||||
|
||||
tracker: wt,
|
||||
}
|
||||
}
|
||||
@ -120,39 +153,45 @@ type trackedWorker struct {
|
||||
wid WorkerID
|
||||
workerInfo storiface.WorkerInfo
|
||||
|
||||
execute chan struct{} // channel blocking execution in case we're waiting for resources but the task is ready to execute
|
||||
|
||||
tracker *workTracker
|
||||
}
|
||||
|
||||
func (t *trackedWorker) start() {
|
||||
close(t.execute)
|
||||
}
|
||||
|
||||
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces))
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces))
|
||||
}
|
||||
|
||||
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o))
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o))
|
||||
}
|
||||
|
||||
func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
|
||||
}
|
||||
|
||||
func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o))
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o))
|
||||
}
|
||||
|
||||
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed))
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed))
|
||||
}
|
||||
|
||||
func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData))
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData))
|
||||
}
|
||||
|
||||
func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.wid, t.workerInfo, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am))
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am))
|
||||
}
|
||||
|
||||
func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
|
||||
}
|
||||
|
||||
var _ Worker = &trackedWorker{}
|
||||
|
Loading…
Reference in New Issue
Block a user