lotus/extern/sector-storage/worker_local.go
Clint Armstrong e2a1ca7caa Use cgroup limits in worker memory calculations
Worker processes may have memory limitations imposed by Systemd. But
/proc/meminfo shows the entire system memory regardless of these limits.
This results in the scheduler believing the worker has the entire system
memory avaliable and the worker being allocated too many tasks.

This change attempts to read cgroup memory limits for the worker
process. It supports cgroups v1 and v2, and compares cgroup limits
against the system memory and returns the most conservative values to
prevent the worker from being allocated too many tasks and potentially
triggering an OOM event.
2021-11-30 02:06:58 +01:00

613 lines
18 KiB
Go

package sectorstorage
import (
"context"
"encoding/json"
"io"
"os"
"reflect"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/elastic/go-sysinfo"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
var pathTypes = []storiface.SectorFileType{storiface.FTUnsealed, storiface.FTSealed, storiface.FTCache}
type WorkerConfig struct {
TaskTypes []sealtasks.TaskType
NoSwap bool
// IgnoreResourceFiltering enables task distribution to happen on this
// worker regardless of its currently available resources. Used in testing
// with the local worker.
IgnoreResourceFiltering bool
}
// used do provide custom proofs impl (mostly used in testing)
type ExecutorFunc func() (ffiwrapper.Storage, error)
type LocalWorker struct {
storage stores.Store
localStore *stores.Local
sindex stores.SectorIndex
ret storiface.WorkerReturn
executor ExecutorFunc
noSwap bool
// see equivalent field on WorkerConfig.
ignoreResources bool
ct *workerCallTracker
acceptTasks map[sealtasks.TaskType]struct{}
running sync.WaitGroup
taskLk sync.Mutex
session uuid.UUID
testDisable int64
closing chan struct{}
}
func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
acceptTasks := map[sealtasks.TaskType]struct{}{}
for _, taskType := range wcfg.TaskTypes {
acceptTasks[taskType] = struct{}{}
}
w := &LocalWorker{
storage: store,
localStore: local,
sindex: sindex,
ret: ret,
ct: &workerCallTracker{
st: cst,
},
acceptTasks: acceptTasks,
executor: executor,
noSwap: wcfg.NoSwap,
ignoreResources: wcfg.IgnoreResourceFiltering,
session: uuid.New(),
closing: make(chan struct{}),
}
if w.executor == nil {
w.executor = w.ffiExec
}
unfinished, err := w.ct.unfinished()
if err != nil {
log.Errorf("reading unfinished tasks: %+v", err)
return w
}
go func() {
for _, call := range unfinished {
err := storiface.Err(storiface.ErrTempWorkerRestart, xerrors.New("worker restarted"))
// TODO: Handle restarting PC1 once support is merged
if doReturn(context.TODO(), call.RetType, call.ID, ret, nil, err) {
if err := w.ct.onReturned(call.ID); err != nil {
log.Errorf("marking call as returned failed: %s: %+v", call.RetType, err)
}
}
}
}()
return w
}
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
return newLocalWorker(nil, wcfg, store, local, sindex, ret, cst)
}
type localWorkerPathProvider struct {
w *LocalWorker
op storiface.AcquireMode
}
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing, l.op)
if err != nil {
return storiface.SectorPaths{}, nil, err
}
releaseStorage, err := l.w.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal)
if err != nil {
return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
}
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
return paths, func() {
releaseStorage()
for _, fileType := range pathTypes {
if fileType&allocate == 0 {
continue
}
sid := storiface.PathByType(storageIDs, fileType)
if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector.ID, fileType, l.op == storiface.AcquireMove); err != nil {
log.Errorf("declare sector error: %+v", err)
}
}
}, nil
}
func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) {
return ffiwrapper.New(&localWorkerPathProvider{w: l})
}
type ReturnType string
const (
AddPiece ReturnType = "AddPiece"
SealPreCommit1 ReturnType = "SealPreCommit1"
SealPreCommit2 ReturnType = "SealPreCommit2"
SealCommit1 ReturnType = "SealCommit1"
SealCommit2 ReturnType = "SealCommit2"
FinalizeSector ReturnType = "FinalizeSector"
ReleaseUnsealed ReturnType = "ReleaseUnsealed"
MoveStorage ReturnType = "MoveStorage"
UnsealPiece ReturnType = "UnsealPiece"
Fetch ReturnType = "Fetch"
)
// in: func(WorkerReturn, context.Context, CallID, err string)
// in: func(WorkerReturn, context.Context, CallID, ret T, err string)
func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error {
rf := reflect.ValueOf(in)
ft := rf.Type()
withRet := ft.NumIn() == 5
return func(ctx context.Context, ci storiface.CallID, wr storiface.WorkerReturn, i interface{}, err *storiface.CallError) error {
rctx := reflect.ValueOf(ctx)
rwr := reflect.ValueOf(wr)
rerr := reflect.ValueOf(err)
rci := reflect.ValueOf(ci)
var ro []reflect.Value
if withRet {
ret := reflect.ValueOf(i)
if i == nil {
ret = reflect.Zero(rf.Type().In(3))
}
ro = rf.Call([]reflect.Value{rwr, rctx, rci, ret, rerr})
} else {
ro = rf.Call([]reflect.Value{rwr, rctx, rci, rerr})
}
if !ro[0].IsNil() {
return ro[0].Interface().(error)
}
return nil
}
}
var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error{
AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece),
SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
SealCommit1: rfunc(storiface.WorkerReturn.ReturnSealCommit1),
SealCommit2: rfunc(storiface.WorkerReturn.ReturnSealCommit2),
FinalizeSector: rfunc(storiface.WorkerReturn.ReturnFinalizeSector),
ReleaseUnsealed: rfunc(storiface.WorkerReturn.ReturnReleaseUnsealed),
MoveStorage: rfunc(storiface.WorkerReturn.ReturnMoveStorage),
UnsealPiece: rfunc(storiface.WorkerReturn.ReturnUnsealPiece),
Fetch: rfunc(storiface.WorkerReturn.ReturnFetch),
}
func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, rt ReturnType, work func(ctx context.Context, ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) {
ci := storiface.CallID{
Sector: sector.ID,
ID: uuid.New(),
}
if err := l.ct.onStart(ci, rt); err != nil {
log.Errorf("tracking call (start): %+v", err)
}
l.running.Add(1)
go func() {
defer l.running.Done()
ctx := &wctx{
vals: ctx,
closing: l.closing,
}
res, err := work(ctx, ci)
if err != nil {
rb, err := json.Marshal(res)
if err != nil {
log.Errorf("tracking call (marshaling results): %+v", err)
} else {
if err := l.ct.onDone(ci, rb); err != nil {
log.Errorf("tracking call (done): %+v", err)
}
}
}
if doReturn(ctx, rt, ci, l.ret, res, toCallError(err)) {
if err := l.ct.onReturned(ci); err != nil {
log.Errorf("tracking call (done): %+v", err)
}
}
}()
return ci, nil
}
func toCallError(err error) *storiface.CallError {
var serr *storiface.CallError
if err != nil && !xerrors.As(err, &serr) {
serr = storiface.Err(storiface.ErrUnknown, err)
}
return serr
}
// doReturn tries to send the result to manager, returns true if successful
func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret storiface.WorkerReturn, res interface{}, rerr *storiface.CallError) bool {
for {
err := returnFunc[rt](ctx, ci, ret, res, rerr)
if err == nil {
break
}
log.Errorf("return error, will retry in 5s: %s: %+v", rt, err)
select {
case <-time.After(5 * time.Second):
case <-ctx.Done():
log.Errorf("failed to return results: %s", ctx.Err())
// fine to just return, worker is most likely shutting down, and
// we didn't mark the result as returned yet, so we'll try to
// re-submit it on restart
return false
}
}
return true
}
func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) error {
sb, err := l.executor()
if err != nil {
return err
}
return sb.NewSector(ctx, sector)
}
func (l *LocalWorker) AddPiece(ctx context.Context, sector storage.SectorRef, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
return l.asyncCall(ctx, sector, AddPiece, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
return sb.AddPiece(ctx, sector, epcs, sz, r)
})
}
func (l *LocalWorker) Fetch(ctx context.Context, sector storage.SectorRef, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return l.asyncCall(ctx, sector, Fetch, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
_, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, storiface.FTNone, ptype)
if err == nil {
done()
}
return nil, err
})
}
func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return l.asyncCall(ctx, sector, SealPreCommit1, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
{
// cleanup previous failed attempts if they exist
if err := l.storage.Remove(ctx, sector.ID, storiface.FTSealed, true, nil); err != nil {
return nil, xerrors.Errorf("cleaning up sealed data: %w", err)
}
if err := l.storage.Remove(ctx, sector.ID, storiface.FTCache, true, nil); err != nil {
return nil, xerrors.Errorf("cleaning up cache data: %w", err)
}
}
sb, err := l.executor()
if err != nil {
return nil, err
}
return sb.SealPreCommit1(ctx, sector, ticket, pieces)
})
}
func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
return l.asyncCall(ctx, sector, SealPreCommit2, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
return sb.SealPreCommit2(ctx, sector, phase1Out)
})
}
func (l *LocalWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
return l.asyncCall(ctx, sector, SealCommit1, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
return sb.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
})
}
func (l *LocalWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
return l.asyncCall(ctx, sector, SealCommit2, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
return sb.SealCommit2(ctx, sector, phase1Out)
})
}
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
return l.asyncCall(ctx, sector, FinalizeSector, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
if err := sb.FinalizeSector(ctx, sector, keepUnsealed); err != nil {
return nil, xerrors.Errorf("finalizing sector: %w", err)
}
if len(keepUnsealed) == 0 {
if err := l.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true, nil); err != nil {
return nil, xerrors.Errorf("removing unsealed data: %w", err)
}
}
return nil, err
})
}
func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) {
return storiface.UndefCall, xerrors.Errorf("implement me")
}
func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {
var err error
if rerr := l.storage.Remove(ctx, sector, storiface.FTSealed, true, nil); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr))
}
if rerr := l.storage.Remove(ctx, sector, storiface.FTCache, true, nil); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr))
}
if rerr := l.storage.Remove(ctx, sector, storiface.FTUnsealed, true, nil); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
}
return err
}
func (l *LocalWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
return l.asyncCall(ctx, sector, MoveStorage, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
return nil, l.storage.MoveStorage(ctx, sector, types)
})
}
func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
return l.asyncCall(ctx, sector, UnsealPiece, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
log.Debugf("worker will unseal piece now, sector=%+v", sector.ID)
if err = sb.UnsealPiece(ctx, sector, index, size, randomness, cid); err != nil {
return nil, xerrors.Errorf("unsealing sector: %w", err)
}
if err = l.storage.RemoveCopies(ctx, sector.ID, storiface.FTSealed); err != nil {
return nil, xerrors.Errorf("removing source data: %w", err)
}
if err = l.storage.RemoveCopies(ctx, sector.ID, storiface.FTCache); err != nil {
return nil, xerrors.Errorf("removing source data: %w", err)
}
log.Debugf("worker has unsealed piece, sector=%+v", sector.ID)
return nil, nil
})
}
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
l.taskLk.Lock()
defer l.taskLk.Unlock()
return l.acceptTasks, nil
}
func (l *LocalWorker) TaskDisable(ctx context.Context, tt sealtasks.TaskType) error {
l.taskLk.Lock()
defer l.taskLk.Unlock()
delete(l.acceptTasks, tt)
return nil
}
func (l *LocalWorker) TaskEnable(ctx context.Context, tt sealtasks.TaskType) error {
l.taskLk.Lock()
defer l.taskLk.Unlock()
l.acceptTasks[tt] = struct{}{}
return nil
}
func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
return l.localStore.Local(ctx)
}
func (l *LocalWorker) memInfo() (memPhysical uint64, memVirtual uint64, memReserved uint64, err error) {
h, err := sysinfo.Host()
if err != nil {
return 0, 0, 0, err
}
mem, err := h.Memory()
if err != nil {
return 0, 0, 0, err
}
memPhysical = mem.Total
memAvail := mem.Free
memSwap := mem.VirtualTotal
swapAvail := mem.VirtualFree
if cgMemMax, cgMemUsed, cgSwapMax, cgSwapUsed, err := cgroupV1Mem(); err == nil {
if cgMemMax > 0 && cgMemMax < memPhysical {
memPhysical = cgMemMax
memAvail = cgMemMax - cgMemUsed
}
if cgSwapMax > 0 && cgSwapMax < memSwap {
memSwap = cgSwapMax
swapAvail = cgSwapMax - cgSwapUsed
}
}
if cgMemMax, cgMemUsed, cgSwapMax, cgSwapUsed, err := cgroupV2Mem(); err == nil {
if cgMemMax > 0 && cgMemMax < memPhysical {
memPhysical = cgMemMax
memAvail = cgMemMax - cgMemUsed
}
if cgSwapMax > 0 && cgSwapMax < memSwap {
memSwap = cgSwapMax
swapAvail = cgSwapMax - cgSwapUsed
}
}
if l.noSwap {
memSwap = 0
swapAvail = 0
}
memReserved = memPhysical + memSwap - memAvail - swapAvail
return memPhysical, memSwap, memReserved, nil
}
func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
hostname, err := os.Hostname() // TODO: allow overriding from config
if err != nil {
panic(err)
}
gpus, err := ffi.GetGPUDevices()
if err != nil {
log.Errorf("getting gpu devices failed: %+v", err)
}
memPhysical, memSwap, memReserved, err := l.memInfo()
if err != nil {
return storiface.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err)
}
return storiface.WorkerInfo{
Hostname: hostname,
IgnoreResources: l.ignoreResources,
Resources: storiface.WorkerResources{
MemPhysical: memPhysical,
MemSwap: memSwap,
MemReserved: memReserved,
CPUs: uint64(runtime.NumCPU()),
GPUs: gpus,
},
}, nil
}
func (l *LocalWorker) Session(ctx context.Context) (uuid.UUID, error) {
if atomic.LoadInt64(&l.testDisable) == 1 {
return uuid.UUID{}, xerrors.Errorf("disabled")
}
select {
case <-l.closing:
return ClosedWorkerID, nil
default:
return l.session, nil
}
}
func (l *LocalWorker) Close() error {
close(l.closing)
return nil
}
// WaitQuiet blocks as long as there are tasks running
func (l *LocalWorker) WaitQuiet() {
l.running.Wait()
}
type wctx struct {
vals context.Context
closing chan struct{}
}
func (w *wctx) Deadline() (time.Time, bool) {
return time.Time{}, false
}
func (w *wctx) Done() <-chan struct{} {
return w.closing
}
func (w *wctx) Err() error {
select {
case <-w.closing:
return context.Canceled
default:
return nil
}
}
func (w *wctx) Value(key interface{}) interface{} {
return w.vals.Value(key)
}
var _ context.Context = &wctx{}
var _ Worker = &LocalWorker{}