lotus/extern/sector-storage/storiface/worker.go

217 lines
7.0 KiB
Go
Raw Normal View History

package storiface
2020-07-21 18:01:25 +00:00
import (
2020-09-06 16:47:16 +00:00
"context"
2020-11-17 15:28:41 +00:00
"errors"
2020-09-14 07:44:55 +00:00
"fmt"
2020-07-21 18:01:25 +00:00
"time"
2020-09-06 16:47:16 +00:00
"github.com/google/uuid"
"github.com/ipfs/go-cid"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2022-01-18 10:37:15 +00:00
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
2020-09-06 16:47:16 +00:00
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
2020-07-21 18:01:25 +00:00
)
2021-11-29 13:42:20 +00:00
type WorkerID uuid.UUID // worker session UUID
func (w WorkerID) String() string {
return uuid.UUID(w).String()
}
type WorkerInfo struct {
Hostname string
// IgnoreResources indicates whether the worker's available resources should
// be used ignored (true) or used (false) for the purposes of scheduling and
// task assignment. Only supported on local workers. Used for testing.
// Default should be false (zero value, i.e. resources taken into account).
IgnoreResources bool
Resources WorkerResources
}
type WorkerResources struct {
MemPhysical uint64
MemUsed uint64
MemSwap uint64
MemSwapUsed uint64
CPUs uint64 // Logical cores
GPUs []string
2021-11-29 13:42:20 +00:00
// if nil use the default resource table
Resources map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
}
func (wr WorkerResources) ResourceSpec(spt abi.RegisteredSealProof, tt sealtasks.TaskType) Resources {
res := ResourceTable[tt][spt]
// if the worker specifies custom resource table, prefer that
if wr.Resources != nil {
tr, ok := wr.Resources[tt]
if !ok {
return res
}
r, ok := tr[spt]
if ok {
return r
}
}
// otherwise, use the default resource table
return res
}
type WorkerStats struct {
Info WorkerInfo
Tasks []sealtasks.TaskType
Enabled bool
MemUsedMin uint64
MemUsedMax uint64
GpuUsed float64 // nolint
CpuUse uint64 // nolint
}
2020-07-21 18:01:25 +00:00
const (
2021-10-15 19:26:35 +00:00
RWPrepared = 1
RWRunning = 0
RWRetWait = -1
RWReturned = -2
RWRetDone = -3
)
2020-07-21 18:01:25 +00:00
type WorkerJob struct {
2020-09-07 14:12:46 +00:00
ID CallID
2020-07-21 18:01:25 +00:00
Sector abi.SectorID
Task sealtasks.TaskType
2021-10-15 19:26:35 +00:00
// 2+ - assigned
// 1 - prepared
// 0 - running
// -1 - ret-wait
// -2 - returned
// -3 - ret-done
RunWait int
Start time.Time
Hostname string `json:",omitempty"` // optional, set for ret-wait jobs
2020-07-21 18:01:25 +00:00
}
2020-09-06 16:47:16 +00:00
type CallID struct {
Sector abi.SectorID
ID uuid.UUID
}
2020-09-14 07:44:55 +00:00
func (c CallID) String() string {
return fmt.Sprintf("%d-%d-%s", c.Sector.Miner, c.Sector.Number, c.ID)
}
var _ fmt.Stringer = &CallID{}
2020-09-06 16:47:16 +00:00
var UndefCall CallID
type WorkerCalls interface {
2022-01-14 13:11:04 +00:00
// async
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error)
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (CallID, error)
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (CallID, error)
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (CallID, error)
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (CallID, error)
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (CallID, error)
2022-02-02 20:23:35 +00:00
FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (CallID, error)
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (CallID, error)
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (CallID, error)
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (CallID, error)
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (CallID, error)
GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) (CallID, error)
MoveStorage(ctx context.Context, sector storage.SectorRef, types SectorFileType) (CallID, error)
UnsealPiece(context.Context, storage.SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error)
Fetch(context.Context, storage.SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error)
2022-01-14 13:11:04 +00:00
// sync
GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error)
GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (WindowPoStResult, error)
}
type WindowPoStResult struct {
PoStProofs proof.PoStProof
Skipped []abi.SectorID
}
type PostSectorChallenge struct {
SealProof abi.RegisteredSealProof
SectorNumber abi.SectorNumber
SealedCID cid.Cid
Challenge []uint64
2022-01-18 10:25:04 +00:00
Update bool
2022-01-14 13:11:04 +00:00
}
type FallbackChallenges struct {
Sectors []abi.SectorNumber
Challenges map[abi.SectorNumber][]uint64
2020-09-06 16:47:16 +00:00
}
2020-11-17 15:17:45 +00:00
type ErrorCode int
const (
ErrUnknown ErrorCode = iota
)
const (
// Temp Errors
ErrTempUnknown ErrorCode = iota + 100
ErrTempWorkerRestart
ErrTempAllocateSpace
)
type CallError struct {
2020-11-17 15:28:41 +00:00
Code ErrorCode
Message string
sub error
2020-11-17 15:17:45 +00:00
}
func (c *CallError) Error() string {
2020-11-17 15:28:41 +00:00
return fmt.Sprintf("storage call error %d: %s", c.Code, c.Message)
2020-11-17 15:17:45 +00:00
}
func (c *CallError) Unwrap() error {
2020-11-17 15:28:41 +00:00
if c.sub != nil {
return c.sub
}
return errors.New(c.Message)
2020-11-17 15:17:45 +00:00
}
func Err(code ErrorCode, sub error) *CallError {
return &CallError{
2020-11-17 15:28:41 +00:00
Code: code,
Message: sub.Error(),
sub: sub,
2020-11-17 15:17:45 +00:00
}
}
2020-09-06 16:47:16 +00:00
type WorkerReturn interface {
2020-11-17 15:17:45 +00:00
ReturnAddPiece(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error
ReturnSealPreCommit1(ctx context.Context, callID CallID, p1o storage.PreCommit1Out, err *CallError) error
ReturnSealPreCommit2(ctx context.Context, callID CallID, sealed storage.SectorCids, err *CallError) error
ReturnSealCommit1(ctx context.Context, callID CallID, out storage.Commit1Out, err *CallError) error
ReturnSealCommit2(ctx context.Context, callID CallID, proof storage.Proof, err *CallError) error
ReturnFinalizeSector(ctx context.Context, callID CallID, err *CallError) error
ReturnReleaseUnsealed(ctx context.Context, callID CallID, err *CallError) error
ReturnReplicaUpdate(ctx context.Context, callID CallID, out storage.ReplicaUpdateOut, err *CallError) error
ReturnProveReplicaUpdate1(ctx context.Context, callID CallID, proofs storage.ReplicaVanillaProofs, err *CallError) error
ReturnProveReplicaUpdate2(ctx context.Context, callID CallID, proof storage.ReplicaUpdateProof, err *CallError) error
ReturnGenerateSectorKeyFromData(ctx context.Context, callID CallID, err *CallError) error
ReturnFinalizeReplicaUpdate(ctx context.Context, callID CallID, err *CallError) error
2020-11-17 15:17:45 +00:00
ReturnMoveStorage(ctx context.Context, callID CallID, err *CallError) error
ReturnUnsealPiece(ctx context.Context, callID CallID, err *CallError) error
ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err *CallError) error
ReturnFetch(ctx context.Context, callID CallID, err *CallError) error
2020-09-06 16:47:16 +00:00
}