lotus/extern/sector-storage/storiface/worker.go

159 lines
5.2 KiB
Go
Raw Normal View History

package storiface
2020-07-21 18:01:25 +00:00
import (
2020-09-06 16:47:16 +00:00
"context"
2020-11-17 15:28:41 +00:00
"errors"
2020-09-14 07:44:55 +00:00
"fmt"
2020-07-21 18:01:25 +00:00
"time"
2020-09-06 16:47:16 +00:00
"github.com/google/uuid"
"github.com/ipfs/go-cid"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2020-09-06 16:47:16 +00:00
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
2020-07-21 18:01:25 +00:00
)
type WorkerInfo struct {
Hostname string
// IgnoreResources indicates whether the worker's available resources should
// be used ignored (true) or used (false) for the purposes of scheduling and
// task assignment. Only supported on local workers. Used for testing.
// Default should be false (zero value, i.e. resources taken into account).
IgnoreResources bool
Resources WorkerResources
}
type WorkerResources struct {
MemPhysical uint64
MemSwap uint64
MemReserved uint64 // Used by system / other processes
CPUs uint64 // Logical cores
GPUs []string
}
type WorkerStats struct {
Info WorkerInfo
Enabled bool
MemUsedMin uint64
MemUsedMax uint64
2020-08-16 10:40:35 +00:00
GpuUsed bool // nolint
CpuUse uint64 // nolint
}
2020-07-21 18:01:25 +00:00
const (
2021-10-15 19:26:35 +00:00
RWPrepared = 1
RWRunning = 0
RWRetWait = -1
RWReturned = -2
RWRetDone = -3
)
2020-07-21 18:01:25 +00:00
type WorkerJob struct {
2020-09-07 14:12:46 +00:00
ID CallID
2020-07-21 18:01:25 +00:00
Sector abi.SectorID
Task sealtasks.TaskType
2021-10-15 19:26:35 +00:00
// 2+ - assigned
// 1 - prepared
// 0 - running
// -1 - ret-wait
// -2 - returned
// -3 - ret-done
RunWait int
Start time.Time
Hostname string `json:",omitempty"` // optional, set for ret-wait jobs
2020-07-21 18:01:25 +00:00
}
2020-09-06 16:47:16 +00:00
type CallID struct {
Sector abi.SectorID
ID uuid.UUID
}
2020-09-14 07:44:55 +00:00
func (c CallID) String() string {
return fmt.Sprintf("%d-%d-%s", c.Sector.Miner, c.Sector.Number, c.ID)
}
var _ fmt.Stringer = &CallID{}
2020-09-06 16:47:16 +00:00
var UndefCall CallID
type WorkerCalls interface {
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error)
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (CallID, error)
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (CallID, error)
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (CallID, error)
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (CallID, error)
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (CallID, error)
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (CallID, error)
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (CallID, error)
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (CallID, error)
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (CallID, error)
MoveStorage(ctx context.Context, sector storage.SectorRef, types SectorFileType) (CallID, error)
UnsealPiece(context.Context, storage.SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error)
Fetch(context.Context, storage.SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error)
2020-09-06 16:47:16 +00:00
}
2020-11-17 15:17:45 +00:00
type ErrorCode int
const (
ErrUnknown ErrorCode = iota
)
const (
// Temp Errors
ErrTempUnknown ErrorCode = iota + 100
ErrTempWorkerRestart
ErrTempAllocateSpace
)
type CallError struct {
2020-11-17 15:28:41 +00:00
Code ErrorCode
Message string
sub error
2020-11-17 15:17:45 +00:00
}
func (c *CallError) Error() string {
2020-11-17 15:28:41 +00:00
return fmt.Sprintf("storage call error %d: %s", c.Code, c.Message)
2020-11-17 15:17:45 +00:00
}
func (c *CallError) Unwrap() error {
2020-11-17 15:28:41 +00:00
if c.sub != nil {
return c.sub
}
return errors.New(c.Message)
2020-11-17 15:17:45 +00:00
}
func Err(code ErrorCode, sub error) *CallError {
return &CallError{
2020-11-17 15:28:41 +00:00
Code: code,
Message: sub.Error(),
sub: sub,
2020-11-17 15:17:45 +00:00
}
}
2020-09-06 16:47:16 +00:00
type WorkerReturn interface {
2020-11-17 15:17:45 +00:00
ReturnAddPiece(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error
ReturnSealPreCommit1(ctx context.Context, callID CallID, p1o storage.PreCommit1Out, err *CallError) error
ReturnSealPreCommit2(ctx context.Context, callID CallID, sealed storage.SectorCids, err *CallError) error
ReturnSealCommit1(ctx context.Context, callID CallID, out storage.Commit1Out, err *CallError) error
ReturnSealCommit2(ctx context.Context, callID CallID, proof storage.Proof, err *CallError) error
ReturnFinalizeSector(ctx context.Context, callID CallID, err *CallError) error
ReturnReleaseUnsealed(ctx context.Context, callID CallID, err *CallError) error
ReturnReplicaUpdate(ctx context.Context, callID CallID, out storage.ReplicaUpdateOut, err *CallError) error
ReturnProveReplicaUpdate1(ctx context.Context, callID CallID, proofs storage.ReplicaVanillaProofs, err *CallError) error
ReturnProveReplicaUpdate2(ctx context.Context, callID CallID, proof storage.ReplicaUpdateProof, err *CallError) error
2020-11-17 15:17:45 +00:00
ReturnMoveStorage(ctx context.Context, callID CallID, err *CallError) error
ReturnUnsealPiece(ctx context.Context, callID CallID, err *CallError) error
ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err *CallError) error
ReturnFetch(ctx context.Context, callID CallID, err *CallError) error
2020-09-06 16:47:16 +00:00
}