diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 46f0d65e2..495c9630d 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -8,6 +8,7 @@ import ( "reflect" "runtime" "sync" + "time" "github.com/elastic/go-sysinfo" "github.com/google/uuid" @@ -194,7 +195,7 @@ var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storifac "Fetch": rfunc(storiface.WorkerReturn.ReturnFetch), } -func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt ReturnType, work func(ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) { +func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt ReturnType, work func(ctx context.Context, ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) { ci := storiface.CallID{ Sector: sector, ID: uuid.New(), @@ -209,7 +210,10 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt Ret go func() { defer l.running.Done() - res, err := work(ci) + res, err := work(&wctx{ + vals: ctx, + closing: l.closing, + }, ci) { rb, err := json.Marshal(res) @@ -258,13 +262,13 @@ func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs [] return storiface.UndefCall, err } - return l.asyncCall(ctx, sector, "AddPiece", func(ci storiface.CallID) (interface{}, error) { + return l.asyncCall(ctx, sector, "AddPiece", func(ctx context.Context, ci storiface.CallID) (interface{}, error) { return sb.AddPiece(ctx, sector, epcs, sz, r) }) } func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { - return l.asyncCall(ctx, sector, "Fetch", func(ci storiface.CallID) (interface{}, error) { + return l.asyncCall(ctx, sector, "Fetch", func(ctx context.Context, ci storiface.CallID) (interface{}, error) { _, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, storiface.FTNone, ptype) if err == nil { done() @@ -275,7 +279,7 @@ func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType s } func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { - return l.asyncCall(ctx, sector, "SealPreCommit1", func(ci storiface.CallID) (interface{}, error) { + return l.asyncCall(ctx, sector, "SealPreCommit1", func(ctx context.Context, ci storiface.CallID) (interface{}, error) { { // cleanup previous failed attempts if they exist @@ -303,7 +307,7 @@ func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, p return storiface.UndefCall, err } - return l.asyncCall(ctx, sector, "SealPreCommit2", func(ci storiface.CallID) (interface{}, error) { + return l.asyncCall(ctx, sector, "SealPreCommit2", func(ctx context.Context, ci storiface.CallID) (interface{}, error) { return sb.SealPreCommit2(ctx, sector, phase1Out) }) } @@ -314,7 +318,7 @@ func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, tick return storiface.UndefCall, err } - return l.asyncCall(ctx, sector, "SealCommit1", func(ci storiface.CallID) (interface{}, error) { + return l.asyncCall(ctx, sector, "SealCommit1", func(ctx context.Context, ci storiface.CallID) (interface{}, error) { return sb.SealCommit1(ctx, sector, ticket, seed, pieces, cids) }) } @@ -325,7 +329,7 @@ func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phas return storiface.UndefCall, err } - return l.asyncCall(ctx, sector, "SealCommit2", func(ci storiface.CallID) (interface{}, error) { + return l.asyncCall(ctx, sector, "SealCommit2", func(ctx context.Context, ci storiface.CallID) (interface{}, error) { return sb.SealCommit2(ctx, sector, phase1Out) }) } @@ -336,7 +340,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, k return storiface.UndefCall, err } - return l.asyncCall(ctx, sector, "FinalizeSector", func(ci storiface.CallID) (interface{}, error) { + return l.asyncCall(ctx, sector, "FinalizeSector", func(ctx context.Context, ci storiface.CallID) (interface{}, error) { if err := sb.FinalizeSector(ctx, sector, keepUnsealed); err != nil { return nil, xerrors.Errorf("finalizing sector: %w", err) } @@ -372,7 +376,7 @@ func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error { } func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) { - return l.asyncCall(ctx, sector, "MoveStorage", func(ci storiface.CallID) (interface{}, error) { + return l.asyncCall(ctx, sector, "MoveStorage", func(ctx context.Context, ci storiface.CallID) (interface{}, error) { return nil, l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, types) }) } @@ -383,7 +387,7 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde return storiface.UndefCall, err } - return l.asyncCall(ctx, sector, "UnsealPiece", func(ci storiface.CallID) (interface{}, error) { + return l.asyncCall(ctx, sector, "UnsealPiece", func(ctx context.Context, ci storiface.CallID) (interface{}, error) { if err = sb.UnsealPiece(ctx, sector, index, size, randomness, cid); err != nil { return nil, xerrors.Errorf("unsealing sector: %w", err) } @@ -406,7 +410,7 @@ func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector ab return storiface.UndefCall, err } - return l.asyncCall(ctx, sector, "ReadPiece", func(ci storiface.CallID) (interface{}, error) { + return l.asyncCall(ctx, sector, "ReadPiece", func(ctx context.Context, ci storiface.CallID) (interface{}, error) { return sb.ReadPiece(ctx, writer, sector, index, size) }) } @@ -466,4 +470,32 @@ func (l *LocalWorker) WaitQuiet() { l.running.Wait() } +type wctx struct { + vals context.Context + closing chan struct{} +} + +func (w *wctx) Deadline() (time.Time, bool) { + return time.Time{}, false +} + +func (w *wctx) Done() <-chan struct{} { + return w.closing +} + +func (w *wctx) Err() error { + select { + case <-w.closing: + return context.Canceled + default: + return nil + } +} + +func (w *wctx) Value(key interface{}) interface{} { + return w.vals.Value(key) +} + +var _ context.Context = &wctx{} + var _ Worker = &LocalWorker{}