Merge pull request #7527 from filecoin-project/feat/show-prepared-waitres

Show prepared tasks in sealing jobs
This commit is contained in:
Łukasz Magiera 2021-10-20 15:22:35 +01:00 committed by GitHub
commit f7884c4f45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 118 additions and 50 deletions

View File

@ -224,8 +224,10 @@ var sealingJobsCmd = &cli.Command{
for _, l := range lines { for _, l := range lines {
state := "running" state := "running"
switch { switch {
case l.RunWait > 0: case l.RunWait > 1:
state = fmt.Sprintf("assigned(%d)", l.RunWait-1) state = fmt.Sprintf("assigned(%d)", l.RunWait-1)
case l.RunWait == storiface.RWPrepared:
state = "prepared"
case l.RunWait == storiface.RWRetDone: case l.RunWait == storiface.RWRetDone:
if !cctx.Bool("show-ret-done") { if !cctx.Bool("show-ret-done") {
continue continue

View File

@ -155,8 +155,9 @@ func newScheduler() *scheduler {
schedQueue: &requestQueue{}, schedQueue: &requestQueue{},
workTracker: &workTracker{ workTracker: &workTracker{
done: map[storiface.CallID]struct{}{}, done: map[storiface.CallID]struct{}{},
running: map[storiface.CallID]trackedWork{}, running: map[storiface.CallID]trackedWork{},
prepared: map[uuid.UUID]trackedWork{},
}, },
info: make(chan func(interface{})), info: make(chan func(interface{})),

View File

@ -464,7 +464,9 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
go func() { go func() {
// first run the prepare step (e.g. fetching sector data from other worker) // first run the prepare step (e.g. fetching sector data from other worker)
err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc)) tw := sh.workTracker.worker(sw.wid, w.info, w.workerRpc)
tw.start()
err := req.prepare(req.ctx, tw)
w.lk.Lock() w.lk.Lock()
if err != nil { if err != nil {
@ -488,6 +490,14 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
return return
} }
tw = sh.workTracker.worker(sw.wid, w.info, w.workerRpc)
// start tracking work first early in case we need to wait for resources
werr := make(chan error, 1)
go func() {
werr <- req.work(req.ctx, tw)
}()
// wait (if needed) for resources in the 'active' window // wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.info, needRes, &w.lk, func() error { err = w.active.withResources(sw.wid, w.info, needRes, &w.lk, func() error {
w.preparing.free(w.info.Resources, needRes) w.preparing.free(w.info.Resources, needRes)
@ -501,7 +511,8 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
} }
// Do the work! // Do the work!
err = req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc)) tw.start()
err = <-werr
select { select {
case req.ret <- workerResponse{err: err}: case req.ret <- workerResponse{err: err}:
@ -534,7 +545,9 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
go func() { go func() {
// Do the work! // Do the work!
err := req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc)) tw := sh.workTracker.worker(sw.wid, w.info, w.workerRpc)
tw.start()
err := req.work(req.ctx, tw)
select { select {
case req.ret <- workerResponse{err: err}: case req.ret <- workerResponse{err: err}:

View File

@ -35,7 +35,13 @@ func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob {
out := map[uuid.UUID][]storiface.WorkerJob{} out := map[uuid.UUID][]storiface.WorkerJob{}
calls := map[storiface.CallID]struct{}{} calls := map[storiface.CallID]struct{}{}
for _, t := range m.sched.workTracker.Running() { running, preparing := m.sched.workTracker.Running()
for _, t := range running {
out[uuid.UUID(t.worker)] = append(out[uuid.UUID(t.worker)], t.job)
calls[t.job.ID] = struct{}{}
}
for _, t := range preparing {
out[uuid.UUID(t.worker)] = append(out[uuid.UUID(t.worker)], t.job) out[uuid.UUID(t.worker)] = append(out[uuid.UUID(t.worker)], t.job)
calls[t.job.ID] = struct{}{} calls[t.job.ID] = struct{}{}
} }
@ -50,7 +56,7 @@ func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob {
ID: storiface.UndefCall, ID: storiface.UndefCall,
Sector: request.sector.ID, Sector: request.sector.ID,
Task: request.taskType, Task: request.taskType,
RunWait: wi + 1, RunWait: wi + 2,
Start: request.start, Start: request.start,
}) })
} }

View File

@ -47,6 +47,8 @@ type WorkerStats struct {
} }
const ( const (
RWPrepared = 1
RWRunning = 0
RWRetWait = -1 RWRetWait = -1
RWReturned = -2 RWReturned = -2
RWRetDone = -3 RWRetDone = -3
@ -57,7 +59,8 @@ type WorkerJob struct {
Sector abi.SectorID Sector abi.SectorID
Task sealtasks.TaskType Task sealtasks.TaskType
// 1+ - assigned // 2+ - assigned
// 1 - prepared
// 0 - running // 0 - running
// -1 - ret-wait // -1 - ret-wait
// -2 - returned // -2 - returned

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/google/uuid"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.opencensus.io/tag" "go.opencensus.io/tag"
@ -26,8 +27,9 @@ type trackedWork struct {
type workTracker struct { type workTracker struct {
lk sync.Mutex lk sync.Mutex
done map[storiface.CallID]struct{} done map[storiface.CallID]struct{}
running map[storiface.CallID]trackedWork running map[storiface.CallID]trackedWork
prepared map[uuid.UUID]trackedWork
// TODO: done, aggregate stats, queue stats, scheduler feedback // TODO: done, aggregate stats, queue stats, scheduler feedback
} }
@ -56,63 +58,96 @@ func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) {
delete(wt.running, callID) delete(wt.running, callID)
} }
func (wt *workTracker) track(ctx context.Context, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) { func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType, cb func() (storiface.CallID, error)) (storiface.CallID, error) {
return func(callID storiface.CallID, err error) (storiface.CallID, error) { tracked := func(rw int, callID storiface.CallID) trackedWork {
if err != nil { return trackedWork{
return callID, err
}
wt.lk.Lock()
defer wt.lk.Unlock()
_, done := wt.done[callID]
if done {
delete(wt.done, callID)
return callID, err
}
wt.running[callID] = trackedWork{
job: storiface.WorkerJob{ job: storiface.WorkerJob{
ID: callID, ID: callID,
Sector: sid.ID, Sector: sid.ID,
Task: task, Task: task,
Start: time.Now(), Start: time.Now(),
RunWait: rw,
}, },
worker: wid, worker: wid,
workerHostname: wi.Hostname, workerHostname: wi.Hostname,
} }
}
ctx, _ = tag.New( wt.lk.Lock()
ctx, defer wt.lk.Unlock()
tag.Upsert(metrics.TaskType, string(task)),
tag.Upsert(metrics.WorkerHostname, wi.Hostname),
)
stats.Record(ctx, metrics.WorkerCallsStarted.M(1))
select {
case <-ready:
case <-ctx.Done():
return storiface.UndefCall, ctx.Err()
default:
prepID := uuid.New()
wt.prepared[prepID] = tracked(storiface.RWPrepared, storiface.UndefCall)
wt.lk.Unlock()
select {
case <-ready:
case <-ctx.Done():
wt.lk.Lock()
delete(wt.prepared, prepID)
return storiface.UndefCall, ctx.Err()
}
wt.lk.Lock()
delete(wt.prepared, prepID)
}
callID, err := cb()
if err != nil {
return callID, err return callID, err
} }
_, done := wt.done[callID]
if done {
delete(wt.done, callID)
return callID, err
}
wt.running[callID] = tracked(storiface.RWRunning, callID)
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.TaskType, string(task)),
tag.Upsert(metrics.WorkerHostname, wi.Hostname),
)
stats.Record(ctx, metrics.WorkerCallsStarted.M(1))
return callID, err
} }
func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) Worker { func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) *trackedWorker {
return &trackedWorker{ return &trackedWorker{
Worker: w, Worker: w,
wid: wid, wid: wid,
workerInfo: wi, workerInfo: wi,
execute: make(chan struct{}),
tracker: wt, tracker: wt,
} }
} }
func (wt *workTracker) Running() []trackedWork { func (wt *workTracker) Running() ([]trackedWork, []trackedWork) {
wt.lk.Lock() wt.lk.Lock()
defer wt.lk.Unlock() defer wt.lk.Unlock()
out := make([]trackedWork, 0, len(wt.running)) running := make([]trackedWork, 0, len(wt.running))
for _, job := range wt.running { for _, job := range wt.running {
out = append(out, job) running = append(running, job)
}
prepared := make([]trackedWork, 0, len(wt.prepared))
for _, job := range wt.prepared {
prepared = append(prepared, job)
} }
return out return running, prepared
} }
type trackedWorker struct { type trackedWorker struct {
@ -120,39 +155,47 @@ type trackedWorker struct {
wid WorkerID wid WorkerID
workerInfo storiface.WorkerInfo workerInfo storiface.WorkerInfo
execute chan struct{} // channel blocking execution in case we're waiting for resources but the task is ready to execute
tracker *workTracker tracker *workTracker
} }
func (t *trackedWorker) start() {
close(t.execute)
}
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1, func() (storiface.CallID, error) { return t.Worker.SealPreCommit1(ctx, sector, ticket, pieces) })
} }
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) { func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2, func() (storiface.CallID, error) { return t.Worker.SealPreCommit2(ctx, sector, pc1o) })
} }
func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit1, func() (storiface.CallID, error) { return t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids) })
} }
func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) { func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit2, func() (storiface.CallID, error) { return t.Worker.SealCommit2(ctx, sector, c1o) })
} }
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) { func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) })
} }
func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece, func() (storiface.CallID, error) {
return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
})
} }
func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, s, sealtasks.TTFetch, func() (storiface.CallID, error) { return t.Worker.Fetch(ctx, s, ft, ptype, am) })
} }
func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)) return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, id, sealtasks.TTUnseal, func() (storiface.CallID, error) { return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid) })
} }
var _ Worker = &trackedWorker{} var _ Worker = &trackedWorker{}