localworker: Fix contexts
This commit is contained in:
parent
bb5cc06677
commit
04ad1791b0
56
extern/sector-storage/worker_local.go
vendored
56
extern/sector-storage/worker_local.go
vendored
@ -8,6 +8,7 @@ import (
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/elastic/go-sysinfo"
|
||||
"github.com/google/uuid"
|
||||
@ -194,7 +195,7 @@ var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storifac
|
||||
"Fetch": rfunc(storiface.WorkerReturn.ReturnFetch),
|
||||
}
|
||||
|
||||
func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt ReturnType, work func(ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) {
|
||||
func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt ReturnType, work func(ctx context.Context, ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) {
|
||||
ci := storiface.CallID{
|
||||
Sector: sector,
|
||||
ID: uuid.New(),
|
||||
@ -209,7 +210,10 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt Ret
|
||||
go func() {
|
||||
defer l.running.Done()
|
||||
|
||||
res, err := work(ci)
|
||||
res, err := work(&wctx{
|
||||
vals: ctx,
|
||||
closing: l.closing,
|
||||
}, ci)
|
||||
|
||||
{
|
||||
rb, err := json.Marshal(res)
|
||||
@ -258,13 +262,13 @@ func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, "AddPiece", func(ci storiface.CallID) (interface{}, error) {
|
||||
return l.asyncCall(ctx, sector, "AddPiece", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
return sb.AddPiece(ctx, sector, epcs, sz, r)
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
|
||||
return l.asyncCall(ctx, sector, "Fetch", func(ci storiface.CallID) (interface{}, error) {
|
||||
return l.asyncCall(ctx, sector, "Fetch", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
_, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, storiface.FTNone, ptype)
|
||||
if err == nil {
|
||||
done()
|
||||
@ -275,7 +279,7 @@ func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType s
|
||||
}
|
||||
|
||||
func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
|
||||
return l.asyncCall(ctx, sector, "SealPreCommit1", func(ci storiface.CallID) (interface{}, error) {
|
||||
return l.asyncCall(ctx, sector, "SealPreCommit1", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
|
||||
{
|
||||
// cleanup previous failed attempts if they exist
|
||||
@ -303,7 +307,7 @@ func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, p
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, "SealPreCommit2", func(ci storiface.CallID) (interface{}, error) {
|
||||
return l.asyncCall(ctx, sector, "SealPreCommit2", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
return sb.SealPreCommit2(ctx, sector, phase1Out)
|
||||
})
|
||||
}
|
||||
@ -314,7 +318,7 @@ func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, tick
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, "SealCommit1", func(ci storiface.CallID) (interface{}, error) {
|
||||
return l.asyncCall(ctx, sector, "SealCommit1", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
return sb.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
|
||||
})
|
||||
}
|
||||
@ -325,7 +329,7 @@ func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phas
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, "SealCommit2", func(ci storiface.CallID) (interface{}, error) {
|
||||
return l.asyncCall(ctx, sector, "SealCommit2", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
return sb.SealCommit2(ctx, sector, phase1Out)
|
||||
})
|
||||
}
|
||||
@ -336,7 +340,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, k
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, "FinalizeSector", func(ci storiface.CallID) (interface{}, error) {
|
||||
return l.asyncCall(ctx, sector, "FinalizeSector", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
if err := sb.FinalizeSector(ctx, sector, keepUnsealed); err != nil {
|
||||
return nil, xerrors.Errorf("finalizing sector: %w", err)
|
||||
}
|
||||
@ -372,7 +376,7 @@ func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {
|
||||
}
|
||||
|
||||
func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) {
|
||||
return l.asyncCall(ctx, sector, "MoveStorage", func(ci storiface.CallID) (interface{}, error) {
|
||||
return l.asyncCall(ctx, sector, "MoveStorage", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
return nil, l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, types)
|
||||
})
|
||||
}
|
||||
@ -383,7 +387,7 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, "UnsealPiece", func(ci storiface.CallID) (interface{}, error) {
|
||||
return l.asyncCall(ctx, sector, "UnsealPiece", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
if err = sb.UnsealPiece(ctx, sector, index, size, randomness, cid); err != nil {
|
||||
return nil, xerrors.Errorf("unsealing sector: %w", err)
|
||||
}
|
||||
@ -406,7 +410,7 @@ func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector ab
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, "ReadPiece", func(ci storiface.CallID) (interface{}, error) {
|
||||
return l.asyncCall(ctx, sector, "ReadPiece", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
return sb.ReadPiece(ctx, writer, sector, index, size)
|
||||
})
|
||||
}
|
||||
@ -466,4 +470,32 @@ func (l *LocalWorker) WaitQuiet() {
|
||||
l.running.Wait()
|
||||
}
|
||||
|
||||
type wctx struct {
|
||||
vals context.Context
|
||||
closing chan struct{}
|
||||
}
|
||||
|
||||
func (w *wctx) Deadline() (time.Time, bool) {
|
||||
return time.Time{}, false
|
||||
}
|
||||
|
||||
func (w *wctx) Done() <-chan struct{} {
|
||||
return w.closing
|
||||
}
|
||||
|
||||
func (w *wctx) Err() error {
|
||||
select {
|
||||
case <-w.closing:
|
||||
return context.Canceled
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (w *wctx) Value(key interface{}) interface{} {
|
||||
return w.vals.Value(key)
|
||||
}
|
||||
|
||||
var _ context.Context = &wctx{}
|
||||
|
||||
var _ Worker = &LocalWorker{}
|
||||
|
Loading…
Reference in New Issue
Block a user