lotus/extern/sector-storage/storiface/worker.go
Clint Armstrong 4ef8543128 Permit workers to override resource table
In an environment with heterogenious worker nodes, a universal resource
table for all workers does not allow effective scheduling of tasks. Some
workers may have different proof cache settings, changing the required
memory for different tasks. Some workers may have a different count of
CPUs per core-complex, changing the max parallelism of PC1.

This change allows workers to customize these parameters with
environment variables. A worker could set the environment variable
PC1_MIN_MEMORY for example to customize the minimum memory requirement
for PC1 tasks. If no environment variables are specified, the resource
table on the miner is used, except for PC1 parallelism.

If PC1_MAX_PARALLELISM is not specified, and
FIL_PROOFS_USE_MULTICORE_SDR is set, PC1_MAX_PARALLELSIM will
automatically be set to FIL_PROOFS_MULTICORE_SDR_PRODUCERS + 1.
2021-11-30 02:06:58 +01:00

154 lines
4.5 KiB
Go

package storiface
import (
"context"
"errors"
"fmt"
"time"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
)
type WorkerInfo struct {
Hostname string
// IgnoreResources indicates whether the worker's available resources should
// be used ignored (true) or used (false) for the purposes of scheduling and
// task assignment. Only supported on local workers. Used for testing.
// Default should be false (zero value, i.e. resources taken into account).
IgnoreResources bool
Resources WorkerResources
}
type WorkerResources struct {
MemPhysical uint64
MemUsed uint64
MemSwap uint64
MemSwapUsed uint64
CPUs uint64 // Logical cores
GPUs []string
ResourceOpts map[string]string
}
type WorkerStats struct {
Info WorkerInfo
Enabled bool
MemUsedMin uint64
MemUsedMax uint64
GpuUsed float64 // nolint
CpuUse uint64 // nolint
}
const (
RWPrepared = 1
RWRunning = 0
RWRetWait = -1
RWReturned = -2
RWRetDone = -3
)
type WorkerJob struct {
ID CallID
Sector abi.SectorID
Task sealtasks.TaskType
// 2+ - assigned
// 1 - prepared
// 0 - running
// -1 - ret-wait
// -2 - returned
// -3 - ret-done
RunWait int
Start time.Time
Hostname string `json:",omitempty"` // optional, set for ret-wait jobs
}
type CallID struct {
Sector abi.SectorID
ID uuid.UUID
}
func (c CallID) String() string {
return fmt.Sprintf("%d-%d-%s", c.Sector.Miner, c.Sector.Number, c.ID)
}
var _ fmt.Stringer = &CallID{}
var UndefCall CallID
type WorkerCalls interface {
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error)
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (CallID, error)
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (CallID, error)
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (CallID, error)
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (CallID, error)
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (CallID, error)
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (CallID, error)
MoveStorage(ctx context.Context, sector storage.SectorRef, types SectorFileType) (CallID, error)
UnsealPiece(context.Context, storage.SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error)
Fetch(context.Context, storage.SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error)
}
type ErrorCode int
const (
ErrUnknown ErrorCode = iota
)
const (
// Temp Errors
ErrTempUnknown ErrorCode = iota + 100
ErrTempWorkerRestart
ErrTempAllocateSpace
)
type CallError struct {
Code ErrorCode
Message string
sub error
}
func (c *CallError) Error() string {
return fmt.Sprintf("storage call error %d: %s", c.Code, c.Message)
}
func (c *CallError) Unwrap() error {
if c.sub != nil {
return c.sub
}
return errors.New(c.Message)
}
func Err(code ErrorCode, sub error) *CallError {
return &CallError{
Code: code,
Message: sub.Error(),
sub: sub,
}
}
type WorkerReturn interface {
ReturnAddPiece(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error
ReturnSealPreCommit1(ctx context.Context, callID CallID, p1o storage.PreCommit1Out, err *CallError) error
ReturnSealPreCommit2(ctx context.Context, callID CallID, sealed storage.SectorCids, err *CallError) error
ReturnSealCommit1(ctx context.Context, callID CallID, out storage.Commit1Out, err *CallError) error
ReturnSealCommit2(ctx context.Context, callID CallID, proof storage.Proof, err *CallError) error
ReturnFinalizeSector(ctx context.Context, callID CallID, err *CallError) error
ReturnReleaseUnsealed(ctx context.Context, callID CallID, err *CallError) error
ReturnMoveStorage(ctx context.Context, callID CallID, err *CallError) error
ReturnUnsealPiece(ctx context.Context, callID CallID, err *CallError) error
ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err *CallError) error
ReturnFetch(ctx context.Context, callID CallID, err *CallError) error
}