lotus/extern/sector-storage/worker_local.go

502 lines
15 KiB
Go
Raw Normal View History

package sectorstorage
2020-03-23 11:40:02 +00:00
import (
"context"
"encoding/json"
2020-03-23 11:40:02 +00:00
"io"
"os"
2020-09-14 07:44:55 +00:00
"reflect"
"runtime"
2020-09-22 16:36:44 +00:00
"sync"
2020-09-22 22:10:36 +00:00
"time"
2020-03-23 11:40:02 +00:00
"github.com/elastic/go-sysinfo"
2020-09-06 16:47:16 +00:00
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
2020-05-14 15:35:38 +00:00
"github.com/ipfs/go-cid"
2020-03-23 11:40:02 +00:00
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2020-09-14 07:44:55 +00:00
"github.com/filecoin-project/go-statestore"
2020-03-23 11:40:02 +00:00
storage2 "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
2020-03-23 11:40:02 +00:00
)
2020-09-06 16:54:00 +00:00
var pathTypes = []storiface.SectorFileType{storiface.FTUnsealed, storiface.FTSealed, storiface.FTCache}
2020-03-23 11:40:02 +00:00
type WorkerConfig struct {
2020-06-15 12:32:17 +00:00
SealProof abi.RegisteredSealProof
2020-03-23 11:40:02 +00:00
TaskTypes []sealtasks.TaskType
}
type LocalWorker struct {
2020-03-26 02:50:56 +00:00
scfg *ffiwrapper.Config
2020-03-23 11:40:02 +00:00
storage stores.Store
localStore *stores.Local
sindex stores.SectorIndex
2020-09-06 16:47:16 +00:00
ret storiface.WorkerReturn
executor func() (ffiwrapper.Storage, error)
2020-03-23 11:40:02 +00:00
ct *workerCallTracker
2020-03-23 11:40:02 +00:00
acceptTasks map[sealtasks.TaskType]struct{}
2020-09-22 16:36:44 +00:00
running sync.WaitGroup
closing chan struct{}
2020-03-23 11:40:02 +00:00
}
func newLocalWorker(executor func() (ffiwrapper.Storage, error), wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
2020-03-23 11:40:02 +00:00
acceptTasks := map[sealtasks.TaskType]struct{}{}
for _, taskType := range wcfg.TaskTypes {
acceptTasks[taskType] = struct{}{}
}
w := &LocalWorker{
2020-03-26 02:50:56 +00:00
scfg: &ffiwrapper.Config{
2020-03-23 11:40:02 +00:00
SealProofType: wcfg.SealProof,
},
storage: store,
localStore: local,
sindex: sindex,
2020-09-07 14:35:54 +00:00
ret: ret,
2020-03-23 11:40:02 +00:00
ct: &workerCallTracker{
2020-09-14 07:44:55 +00:00
st: cst,
},
2020-03-23 11:40:02 +00:00
acceptTasks: acceptTasks,
executor: executor,
closing: make(chan struct{}),
2020-03-23 11:40:02 +00:00
}
if w.executor == nil {
w.executor = w.ffiExec
}
unfinished, err := w.ct.unfinished()
if err != nil {
log.Errorf("reading unfinished tasks: %+v", err)
return w
}
go func() {
for _, call := range unfinished {
err := xerrors.Errorf("worker restarted")
if err := returnFunc[call.RetType](context.TODO(), call.ID, ret, nil, err); err != nil {
log.Errorf("return error: %s: %+v", call.RetType, err)
continue
}
if err := w.ct.onReturned(call.ID); err != nil {
log.Errorf("marking call as returned failed: %s: %+v", call.RetType, err)
}
}
}()
return w
}
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
return newLocalWorker(nil, wcfg, store, local, sindex, ret, cst)
2020-03-23 11:40:02 +00:00
}
type localWorkerPathProvider struct {
2020-05-26 08:25:29 +00:00
w *LocalWorker
2020-09-06 16:54:00 +00:00
op storiface.AcquireMode
2020-03-23 11:40:02 +00:00
}
2020-09-06 16:54:00 +00:00
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
2020-07-06 16:36:44 +00:00
2020-06-04 21:30:20 +00:00
paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing, l.op)
2020-03-23 11:40:02 +00:00
if err != nil {
2020-09-06 16:54:00 +00:00
return storiface.SectorPaths{}, nil, err
2020-03-23 11:40:02 +00:00
}
2020-09-06 16:54:00 +00:00
releaseStorage, err := l.w.localStore.Reserve(ctx, sector, l.w.scfg.SealProofType, allocate, storageIDs, storiface.FSOverheadSeal)
2020-07-06 16:36:44 +00:00
if err != nil {
2020-09-06 16:54:00 +00:00
return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
2020-07-06 16:36:44 +00:00
}
2020-03-23 11:40:02 +00:00
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
return paths, func() {
2020-07-06 16:36:44 +00:00
releaseStorage()
2020-03-23 11:40:02 +00:00
for _, fileType := range pathTypes {
if fileType&allocate == 0 {
continue
}
2020-09-06 16:54:00 +00:00
sid := storiface.PathByType(storageIDs, fileType)
2020-03-23 11:40:02 +00:00
2020-09-06 16:54:00 +00:00
if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType, l.op == storiface.AcquireMove); err != nil {
2020-03-23 11:40:02 +00:00
log.Errorf("declare sector error: %+v", err)
}
}
}, nil
}
func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) {
2020-03-26 02:50:56 +00:00
return ffiwrapper.New(&localWorkerPathProvider{w: l}, l.scfg)
2020-03-23 11:40:02 +00:00
}
type ReturnType string
2020-09-14 07:44:55 +00:00
// in: func(WorkerReturn, context.Context, CallID, err string)
// in: func(WorkerReturn, context.Context, CallID, ret T, err string)
2020-09-14 18:28:47 +00:00
func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, error) error {
2020-09-14 07:44:55 +00:00
rf := reflect.ValueOf(in)
ft := rf.Type()
2020-09-14 18:28:47 +00:00
withRet := ft.NumIn() == 5
2020-09-14 07:44:55 +00:00
2020-09-14 18:28:47 +00:00
return func(ctx context.Context, ci storiface.CallID, wr storiface.WorkerReturn, i interface{}, err error) error {
2020-09-14 07:44:55 +00:00
rctx := reflect.ValueOf(ctx)
rwr := reflect.ValueOf(wr)
rerr := reflect.ValueOf(errstr(err))
2020-09-14 18:28:47 +00:00
rci := reflect.ValueOf(ci)
2020-09-14 07:44:55 +00:00
var ro []reflect.Value
if withRet {
ret := reflect.ValueOf(i)
if i == nil {
ret = reflect.Zero(rf.Type().In(3))
}
ro = rf.Call([]reflect.Value{rwr, rctx, rci, ret, rerr})
2020-09-14 07:44:55 +00:00
} else {
2020-09-14 18:28:47 +00:00
ro = rf.Call([]reflect.Value{rwr, rctx, rci, rerr})
2020-09-14 07:44:55 +00:00
}
2020-09-14 18:28:47 +00:00
if !ro[0].IsNil() {
return ro[0].Interface().(error)
}
return nil
2020-09-14 07:44:55 +00:00
}
}
var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, error) error{
2020-09-14 07:44:55 +00:00
"AddPiece": rfunc(storiface.WorkerReturn.ReturnAddPiece),
"SealPreCommit1": rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
"SealPreCommit2": rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
"SealCommit1": rfunc(storiface.WorkerReturn.ReturnSealCommit1),
"SealCommit2": rfunc(storiface.WorkerReturn.ReturnSealCommit2),
"FinalizeSector": rfunc(storiface.WorkerReturn.ReturnFinalizeSector),
"ReleaseUnsealed": rfunc(storiface.WorkerReturn.ReturnReleaseUnsealed),
"MoveStorage": rfunc(storiface.WorkerReturn.ReturnMoveStorage),
"UnsealPiece": rfunc(storiface.WorkerReturn.ReturnUnsealPiece),
"ReadPiece": rfunc(storiface.WorkerReturn.ReturnReadPiece),
"Fetch": rfunc(storiface.WorkerReturn.ReturnFetch),
}
2020-09-22 22:10:36 +00:00
func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt ReturnType, work func(ctx context.Context, ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) {
2020-09-06 16:47:16 +00:00
ci := storiface.CallID{
Sector: sector,
ID: uuid.New(),
}
if err := l.ct.onStart(ci, rt); err != nil {
2020-09-14 07:44:55 +00:00
log.Errorf("tracking call (start): %+v", err)
}
2020-09-22 16:36:44 +00:00
l.running.Add(1)
2020-09-14 07:44:55 +00:00
go func() {
2020-09-22 16:36:44 +00:00
defer l.running.Done()
2020-09-22 22:10:36 +00:00
res, err := work(&wctx{
vals: ctx,
closing: l.closing,
}, ci)
{
rb, err := json.Marshal(res)
if err != nil {
log.Errorf("tracking call (marshaling results): %+v", err)
} else {
if err := l.ct.onDone(ci, rb); err != nil {
log.Errorf("tracking call (done): %+v", err)
}
}
}
2020-09-14 18:28:47 +00:00
if err := returnFunc[rt](ctx, ci, l.ret, res, err); err != nil {
2020-09-14 07:44:55 +00:00
log.Errorf("return error: %s: %+v", rt, err)
}
if err := l.ct.onReturned(ci); err != nil {
log.Errorf("tracking call (done): %+v", err)
}
2020-09-14 07:44:55 +00:00
}()
2020-09-06 16:47:16 +00:00
return ci, nil
}
func errstr(err error) string {
if err != nil {
return err.Error()
}
return ""
}
2020-03-23 11:40:02 +00:00
func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
sb, err := l.executor()
2020-03-23 11:40:02 +00:00
if err != nil {
return err
}
return sb.NewSector(ctx, sector)
}
2020-09-06 16:47:16 +00:00
func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) {
sb, err := l.executor()
2020-03-23 11:40:02 +00:00
if err != nil {
2020-09-06 16:47:16 +00:00
return storiface.UndefCall, err
2020-03-23 11:40:02 +00:00
}
2020-09-22 22:10:36 +00:00
return l.asyncCall(ctx, sector, "AddPiece", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
2020-09-14 07:44:55 +00:00
return sb.AddPiece(ctx, sector, epcs, sz, r)
2020-09-06 16:47:16 +00:00
})
2020-04-27 12:55:37 +00:00
}
2020-09-06 16:54:00 +00:00
func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
2020-09-22 22:10:36 +00:00
return l.asyncCall(ctx, sector, "Fetch", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
2020-09-06 16:54:00 +00:00
_, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, storiface.FTNone, ptype)
2020-09-06 16:47:16 +00:00
if err == nil {
done()
2020-05-13 18:45:14 +00:00
}
2020-09-14 07:44:55 +00:00
return nil, err
2020-09-06 16:47:16 +00:00
})
}
2020-05-13 18:45:14 +00:00
2020-09-06 16:47:16 +00:00
func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
2020-09-22 22:10:36 +00:00
return l.asyncCall(ctx, sector, "SealPreCommit1", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
2020-03-23 11:40:02 +00:00
2020-09-06 16:47:16 +00:00
{
// cleanup previous failed attempts if they exist
2020-09-14 07:44:55 +00:00
if err := l.storage.Remove(ctx, sector, storiface.FTSealed, true); err != nil {
return nil, xerrors.Errorf("cleaning up sealed data: %w", err)
2020-09-06 16:47:16 +00:00
}
2020-09-14 07:44:55 +00:00
if err := l.storage.Remove(ctx, sector, storiface.FTCache, true); err != nil {
return nil, xerrors.Errorf("cleaning up cache data: %w", err)
2020-09-06 16:47:16 +00:00
}
}
sb, err := l.executor()
2020-09-06 16:47:16 +00:00
if err != nil {
2020-09-14 07:44:55 +00:00
return nil, err
2020-09-06 16:47:16 +00:00
}
2020-09-14 07:44:55 +00:00
return sb.SealPreCommit1(ctx, sector, ticket, pieces)
2020-09-06 16:47:16 +00:00
})
2020-03-23 11:40:02 +00:00
}
2020-09-06 16:47:16 +00:00
func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (storiface.CallID, error) {
sb, err := l.executor()
2020-03-23 11:40:02 +00:00
if err != nil {
2020-09-06 16:47:16 +00:00
return storiface.UndefCall, err
2020-03-23 11:40:02 +00:00
}
2020-09-22 22:10:36 +00:00
return l.asyncCall(ctx, sector, "SealPreCommit2", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
2020-09-14 07:44:55 +00:00
return sb.SealPreCommit2(ctx, sector, phase1Out)
2020-09-06 16:47:16 +00:00
})
2020-03-23 11:40:02 +00:00
}
2020-09-06 16:47:16 +00:00
func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (storiface.CallID, error) {
sb, err := l.executor()
2020-03-23 11:40:02 +00:00
if err != nil {
2020-09-06 16:47:16 +00:00
return storiface.UndefCall, err
2020-03-23 11:40:02 +00:00
}
2020-09-22 22:10:36 +00:00
return l.asyncCall(ctx, sector, "SealCommit1", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
2020-09-14 07:44:55 +00:00
return sb.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
2020-09-06 16:47:16 +00:00
})
2020-03-23 11:40:02 +00:00
}
2020-09-06 16:47:16 +00:00
func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (storiface.CallID, error) {
sb, err := l.executor()
2020-03-23 11:40:02 +00:00
if err != nil {
2020-09-06 16:47:16 +00:00
return storiface.UndefCall, err
2020-03-23 11:40:02 +00:00
}
2020-09-22 22:10:36 +00:00
return l.asyncCall(ctx, sector, "SealCommit2", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
2020-09-14 07:44:55 +00:00
return sb.SealCommit2(ctx, sector, phase1Out)
2020-09-06 16:47:16 +00:00
})
2020-03-23 11:40:02 +00:00
}
2020-09-06 16:47:16 +00:00
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage2.Range) (storiface.CallID, error) {
sb, err := l.executor()
2020-03-23 11:40:02 +00:00
if err != nil {
2020-09-06 16:47:16 +00:00
return storiface.UndefCall, err
2020-03-23 11:40:02 +00:00
}
2020-09-22 22:10:36 +00:00
return l.asyncCall(ctx, sector, "FinalizeSector", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
2020-09-06 16:47:16 +00:00
if err := sb.FinalizeSector(ctx, sector, keepUnsealed); err != nil {
2020-09-14 07:44:55 +00:00
return nil, xerrors.Errorf("finalizing sector: %w", err)
2020-09-06 16:47:16 +00:00
}
2020-09-06 16:47:16 +00:00
if len(keepUnsealed) == 0 {
2020-09-14 07:44:55 +00:00
if err := l.storage.Remove(ctx, sector, storiface.FTUnsealed, true); err != nil {
return nil, xerrors.Errorf("removing unsealed data: %w", err)
2020-09-06 16:47:16 +00:00
}
2020-07-03 19:52:31 +00:00
}
2020-09-14 07:44:55 +00:00
return nil, err
2020-09-06 16:47:16 +00:00
})
2020-06-03 21:44:59 +00:00
}
2020-09-06 16:47:16 +00:00
func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage2.Range) (storiface.CallID, error) {
return storiface.UndefCall, xerrors.Errorf("implement me")
}
func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {
var err error
2020-09-06 16:54:00 +00:00
if rerr := l.storage.Remove(ctx, sector, storiface.FTSealed, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr))
}
2020-09-06 16:54:00 +00:00
if rerr := l.storage.Remove(ctx, sector, storiface.FTCache, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr))
}
2020-09-06 16:54:00 +00:00
if rerr := l.storage.Remove(ctx, sector, storiface.FTUnsealed, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
}
return err
}
2020-09-06 16:54:00 +00:00
func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) {
2020-09-22 22:10:36 +00:00
return l.asyncCall(ctx, sector, "MoveStorage", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
2020-09-14 07:44:55 +00:00
return nil, l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, types)
2020-09-06 16:47:16 +00:00
})
2020-03-23 11:40:02 +00:00
}
2020-09-06 16:47:16 +00:00
func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
2020-09-06 16:47:16 +00:00
return storiface.UndefCall, err
}
2020-09-22 22:10:36 +00:00
return l.asyncCall(ctx, sector, "UnsealPiece", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
2020-09-06 16:47:16 +00:00
if err = sb.UnsealPiece(ctx, sector, index, size, randomness, cid); err != nil {
2020-09-14 07:44:55 +00:00
return nil, xerrors.Errorf("unsealing sector: %w", err)
2020-09-06 16:47:16 +00:00
}
2020-05-29 16:57:44 +00:00
2020-09-06 16:54:00 +00:00
if err = l.storage.RemoveCopies(ctx, sector, storiface.FTSealed); err != nil {
2020-09-14 07:44:55 +00:00
return nil, xerrors.Errorf("removing source data: %w", err)
2020-09-06 16:47:16 +00:00
}
2020-09-06 16:54:00 +00:00
if err = l.storage.RemoveCopies(ctx, sector, storiface.FTCache); err != nil {
2020-09-14 07:44:55 +00:00
return nil, xerrors.Errorf("removing source data: %w", err)
2020-09-06 16:47:16 +00:00
}
2020-09-14 07:44:55 +00:00
return nil, nil
2020-09-06 16:47:16 +00:00
})
2020-05-14 15:35:38 +00:00
}
2020-09-06 16:47:16 +00:00
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
2020-09-06 16:47:16 +00:00
return storiface.UndefCall, err
}
2020-09-22 22:10:36 +00:00
return l.asyncCall(ctx, sector, "ReadPiece", func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
2020-09-14 07:44:55 +00:00
return sb.ReadPiece(ctx, writer, sector, index, size)
2020-09-06 16:47:16 +00:00
})
2020-05-14 15:35:38 +00:00
}
2020-03-23 11:40:02 +00:00
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
return l.acceptTasks, nil
}
func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
return l.localStore.Local(ctx)
}
func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
2020-03-23 11:40:02 +00:00
hostname, err := os.Hostname() // TODO: allow overriding from config
if err != nil {
panic(err)
}
gpus, err := ffi.GetGPUDevices()
if err != nil {
log.Errorf("getting gpu devices failed: %+v", err)
}
h, err := sysinfo.Host()
if err != nil {
return storiface.WorkerInfo{}, xerrors.Errorf("getting host info: %w", err)
2020-03-23 11:40:02 +00:00
}
mem, err := h.Memory()
if err != nil {
return storiface.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err)
2020-03-23 11:40:02 +00:00
}
return storiface.WorkerInfo{
2020-03-23 11:40:02 +00:00
Hostname: hostname,
Resources: storiface.WorkerResources{
2020-03-23 11:40:02 +00:00
MemPhysical: mem.Total,
MemSwap: mem.VirtualTotal,
MemReserved: mem.VirtualUsed + mem.Total - mem.Available, // TODO: sub this process
CPUs: uint64(runtime.NumCPU()),
2020-03-23 11:40:02 +00:00
GPUs: gpus,
},
}, nil
}
2020-05-01 18:00:17 +00:00
func (l *LocalWorker) Closing(ctx context.Context) (<-chan struct{}, error) {
return l.closing, nil
2020-05-01 18:00:17 +00:00
}
2020-03-24 23:49:45 +00:00
func (l *LocalWorker) Close() error {
close(l.closing)
2020-03-24 23:49:45 +00:00
return nil
}
2020-09-22 16:36:44 +00:00
// WaitQuiet blocks as long as there are tasks running
func (l *LocalWorker) WaitQuiet() {
l.running.Wait()
}
2020-09-22 22:10:36 +00:00
type wctx struct {
vals context.Context
closing chan struct{}
}
func (w *wctx) Deadline() (time.Time, bool) {
return time.Time{}, false
}
func (w *wctx) Done() <-chan struct{} {
return w.closing
}
func (w *wctx) Err() error {
select {
case <-w.closing:
return context.Canceled
default:
return nil
}
}
func (w *wctx) Value(key interface{}) interface{} {
return w.vals.Value(key)
}
var _ context.Context = &wctx{}
2020-03-23 11:40:02 +00:00
var _ Worker = &LocalWorker{}