2022-06-14 18:03:38 +00:00
|
|
|
package sealer
|
2020-03-23 11:40:02 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2020-03-24 18:00:08 +00:00
|
|
|
"errors"
|
2020-03-23 11:40:02 +00:00
|
|
|
"io"
|
|
|
|
"net/http"
|
2022-08-31 11:56:25 +00:00
|
|
|
"sort"
|
2020-09-07 10:20:50 +00:00
|
|
|
"sync"
|
2022-11-17 17:25:30 +00:00
|
|
|
"time"
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2022-04-11 23:22:19 +00:00
|
|
|
"github.com/google/uuid"
|
2022-06-14 15:00:51 +00:00
|
|
|
"github.com/hashicorp/go-multierror"
|
2022-06-15 10:06:22 +00:00
|
|
|
"github.com/ipfs/go-cid"
|
2022-04-11 23:22:19 +00:00
|
|
|
logging "github.com/ipfs/go-log/v2"
|
2022-06-14 15:00:51 +00:00
|
|
|
"github.com/mitchellh/go-homedir"
|
2022-03-16 10:49:48 +00:00
|
|
|
"go.uber.org/multierr"
|
2020-03-23 11:40:02 +00:00
|
|
|
"golang.org/x/xerrors"
|
|
|
|
|
2020-09-07 03:49:10 +00:00
|
|
|
"github.com/filecoin-project/go-state-types/abi"
|
2022-06-14 15:00:51 +00:00
|
|
|
"github.com/filecoin-project/go-statestore"
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2022-11-01 11:01:31 +00:00
|
|
|
"github.com/filecoin-project/lotus/node/config"
|
2022-06-14 18:25:52 +00:00
|
|
|
"github.com/filecoin-project/lotus/storage/paths"
|
2022-06-15 10:06:22 +00:00
|
|
|
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
|
2022-06-14 18:03:38 +00:00
|
|
|
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
|
|
|
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
2022-06-15 10:06:22 +00:00
|
|
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
2020-03-23 11:40:02 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
var log = logging.Logger("advmgr")
|
|
|
|
|
2020-03-24 18:00:08 +00:00
|
|
|
var ErrNoWorkers = errors.New("no suitable workers found")
|
|
|
|
|
2020-03-23 11:40:02 +00:00
|
|
|
type Worker interface {
|
2020-09-06 16:47:16 +00:00
|
|
|
storiface.WorkerCalls
|
2020-03-23 11:40:02 +00:00
|
|
|
|
|
|
|
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
|
|
|
|
|
|
|
|
// Returns paths accessible to the worker
|
2022-01-18 10:57:04 +00:00
|
|
|
Paths(context.Context) ([]storiface.StoragePath, error)
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2020-04-23 22:16:21 +00:00
|
|
|
Info(context.Context) (storiface.WorkerInfo, error)
|
2020-03-24 23:49:45 +00:00
|
|
|
|
2020-10-18 10:35:44 +00:00
|
|
|
Session(context.Context) (uuid.UUID, error)
|
2020-05-01 18:00:17 +00:00
|
|
|
|
2020-09-16 22:35:09 +00:00
|
|
|
Close() error // TODO: do we need this?
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type SectorManager interface {
|
2022-06-17 11:52:19 +00:00
|
|
|
storiface.Sealer
|
|
|
|
storiface.ProverPoSt
|
2020-09-07 14:12:46 +00:00
|
|
|
storiface.WorkerReturn
|
2020-05-16 21:03:29 +00:00
|
|
|
FaultTracker
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
|
|
|
|
2020-10-18 10:35:44 +00:00
|
|
|
var ClosedWorkerID = uuid.UUID{}
|
2020-03-23 11:40:02 +00:00
|
|
|
|
|
|
|
type Manager struct {
|
2022-06-14 18:25:52 +00:00
|
|
|
ls paths.LocalStorage
|
|
|
|
storage paths.Store
|
|
|
|
localStore *paths.Local
|
|
|
|
remoteHnd *paths.FetchHandler
|
|
|
|
index paths.SectorIndex
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2022-05-18 13:47:08 +00:00
|
|
|
sched *Scheduler
|
2022-01-14 13:11:04 +00:00
|
|
|
windowPoStSched *poStScheduler
|
|
|
|
winningPoStSched *poStScheduler
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2022-06-17 11:52:19 +00:00
|
|
|
localProver storiface.ProverPoSt
|
2020-09-07 10:20:50 +00:00
|
|
|
|
2020-09-16 22:35:30 +00:00
|
|
|
workLk sync.Mutex
|
|
|
|
work *statestore.StateStore
|
2020-09-16 15:08:05 +00:00
|
|
|
|
2022-07-01 19:24:54 +00:00
|
|
|
parallelCheckLimit int
|
2022-11-17 17:25:30 +00:00
|
|
|
singleCheckTimeout time.Duration
|
|
|
|
partitionCheckTimeout time.Duration
|
2022-07-01 19:24:54 +00:00
|
|
|
disableBuiltinWindowPoSt bool
|
|
|
|
disableBuiltinWinningPoSt bool
|
|
|
|
disallowRemoteFinalize bool
|
2022-03-29 01:19:11 +00:00
|
|
|
|
2020-09-16 22:35:09 +00:00
|
|
|
callToWork map[storiface.CallID]WorkID
|
2020-09-16 15:08:05 +00:00
|
|
|
// used when we get an early return and there's no callToWork mapping
|
|
|
|
callRes map[storiface.CallID]chan result
|
|
|
|
|
2020-09-16 22:35:09 +00:00
|
|
|
results map[WorkID]result
|
|
|
|
waitRes map[WorkID]chan struct{}
|
2020-09-07 10:20:50 +00:00
|
|
|
}
|
|
|
|
|
2022-06-17 11:52:19 +00:00
|
|
|
var _ storiface.ProverPoSt = &Manager{}
|
2022-01-14 13:11:04 +00:00
|
|
|
|
2020-09-07 10:20:50 +00:00
|
|
|
type result struct {
|
2020-09-07 14:12:46 +00:00
|
|
|
r interface{}
|
2020-09-07 10:20:50 +00:00
|
|
|
err error
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
|
|
|
|
2020-03-27 20:08:06 +00:00
|
|
|
type StorageAuth http.Header
|
|
|
|
|
2020-09-14 07:44:55 +00:00
|
|
|
type WorkerStateStore *statestore.StateStore
|
2020-09-16 15:08:05 +00:00
|
|
|
type ManagerStateStore *statestore.StateStore
|
2020-09-14 07:44:55 +00:00
|
|
|
|
2022-10-31 17:15:09 +00:00
|
|
|
func New(ctx context.Context, lstor *paths.Local, stor paths.Store, ls paths.LocalStorage, si paths.SectorIndex, sc config.SealerConfig, pc config.ProvingConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
|
2022-01-14 13:11:04 +00:00
|
|
|
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
|
2020-03-23 11:40:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, xerrors.Errorf("creating prover instance: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-11-28 17:56:28 +00:00
|
|
|
sh, err := newScheduler(ctx, sc.Assigner)
|
2022-05-23 14:58:43 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2020-03-23 11:40:02 +00:00
|
|
|
m := &Manager{
|
|
|
|
ls: ls,
|
|
|
|
storage: stor,
|
|
|
|
localStore: lstor,
|
2022-06-14 18:25:52 +00:00
|
|
|
remoteHnd: &paths.FetchHandler{Local: lstor, PfHandler: &paths.DefaultPartialFileHandler{}},
|
2020-03-23 11:40:02 +00:00
|
|
|
index: si,
|
|
|
|
|
2022-05-23 14:58:43 +00:00
|
|
|
sched: sh,
|
2022-01-14 13:11:04 +00:00
|
|
|
windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt),
|
|
|
|
winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt),
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2022-01-14 13:11:04 +00:00
|
|
|
localProver: prover,
|
2020-09-07 14:35:54 +00:00
|
|
|
|
2022-10-31 17:15:09 +00:00
|
|
|
parallelCheckLimit: pc.ParallelCheckLimit,
|
2022-11-17 17:25:30 +00:00
|
|
|
singleCheckTimeout: time.Duration(pc.SingleCheckTimeout),
|
|
|
|
partitionCheckTimeout: time.Duration(pc.PartitionCheckTimeout),
|
2022-10-31 17:15:09 +00:00
|
|
|
disableBuiltinWindowPoSt: pc.DisableBuiltinWindowPoSt,
|
|
|
|
disableBuiltinWinningPoSt: pc.DisableBuiltinWinningPoSt,
|
2022-07-01 19:24:54 +00:00
|
|
|
disallowRemoteFinalize: sc.DisallowRemoteFinalize,
|
2022-03-29 01:19:11 +00:00
|
|
|
|
2020-09-16 20:33:49 +00:00
|
|
|
work: mss,
|
2020-09-16 22:35:09 +00:00
|
|
|
callToWork: map[storiface.CallID]WorkID{},
|
2020-09-16 20:33:49 +00:00
|
|
|
callRes: map[storiface.CallID]chan result{},
|
2020-09-16 22:35:09 +00:00
|
|
|
results: map[WorkID]result{},
|
|
|
|
waitRes: map[WorkID]chan struct{}{},
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
|
|
|
|
2020-09-16 22:35:09 +00:00
|
|
|
m.setupWorkTracker()
|
2020-09-16 15:08:05 +00:00
|
|
|
|
2020-04-27 18:37:31 +00:00
|
|
|
go m.sched.runSched()
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2020-03-24 19:38:00 +00:00
|
|
|
localTasks := []sealtasks.TaskType{
|
2022-11-23 17:54:58 +00:00
|
|
|
sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTFinalizeUnsealed, sealtasks.TTFinalizeReplicaUpdate,
|
2020-08-17 09:39:29 +00:00
|
|
|
}
|
2022-09-06 09:06:30 +00:00
|
|
|
if sc.AllowSectorDownload {
|
|
|
|
localTasks = append(localTasks, sealtasks.TTDownloadSector)
|
|
|
|
}
|
2020-08-17 09:39:29 +00:00
|
|
|
if sc.AllowAddPiece {
|
2022-04-26 16:22:52 +00:00
|
|
|
localTasks = append(localTasks, sealtasks.TTAddPiece, sealtasks.TTDataCid)
|
2020-03-24 19:38:00 +00:00
|
|
|
}
|
2020-03-25 20:20:24 +00:00
|
|
|
if sc.AllowPreCommit1 {
|
|
|
|
localTasks = append(localTasks, sealtasks.TTPreCommit1)
|
|
|
|
}
|
|
|
|
if sc.AllowPreCommit2 {
|
|
|
|
localTasks = append(localTasks, sealtasks.TTPreCommit2)
|
|
|
|
}
|
|
|
|
if sc.AllowCommit {
|
|
|
|
localTasks = append(localTasks, sealtasks.TTCommit2)
|
|
|
|
}
|
2020-05-14 01:01:38 +00:00
|
|
|
if sc.AllowUnseal {
|
|
|
|
localTasks = append(localTasks, sealtasks.TTUnseal)
|
|
|
|
}
|
2021-12-08 17:11:19 +00:00
|
|
|
if sc.AllowReplicaUpdate {
|
|
|
|
localTasks = append(localTasks, sealtasks.TTReplicaUpdate)
|
|
|
|
}
|
|
|
|
if sc.AllowProveReplicaUpdate2 {
|
|
|
|
localTasks = append(localTasks, sealtasks.TTProveReplicaUpdate2)
|
|
|
|
}
|
2022-02-14 18:28:49 +00:00
|
|
|
if sc.AllowRegenSectorKey {
|
|
|
|
localTasks = append(localTasks, sealtasks.TTRegenSectorKey)
|
|
|
|
}
|
2020-03-24 19:38:00 +00:00
|
|
|
|
2021-06-21 19:28:15 +00:00
|
|
|
wcfg := WorkerConfig{
|
2022-10-31 17:15:09 +00:00
|
|
|
IgnoreResourceFiltering: sc.ResourceFiltering == config.ResourceFilteringDisabled,
|
2021-06-21 19:28:15 +00:00
|
|
|
TaskTypes: localTasks,
|
2022-08-03 10:54:32 +00:00
|
|
|
Name: sc.LocalWorkerName,
|
2021-06-21 19:28:15 +00:00
|
|
|
}
|
|
|
|
worker := NewLocalWorker(wcfg, stor, lstor, si, m, wss)
|
|
|
|
err = m.AddWorker(ctx, worker)
|
2020-03-23 11:40:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, xerrors.Errorf("adding local worker: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return m, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) AddLocalStorage(ctx context.Context, path string) error {
|
|
|
|
path, err := homedir.Expand(path)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("expanding local path: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := m.localStore.OpenPath(ctx, path); err != nil {
|
|
|
|
return xerrors.Errorf("opening local path: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-11-01 11:01:31 +00:00
|
|
|
if err := m.ls.SetStorage(func(sc *storiface.StorageConfig) {
|
|
|
|
sc.StoragePaths = append(sc.StoragePaths, storiface.LocalPath{Path: path})
|
2020-03-23 11:40:02 +00:00
|
|
|
}); err != nil {
|
|
|
|
return xerrors.Errorf("get storage config: %w", err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-07-15 10:44:05 +00:00
|
|
|
func (m *Manager) DetachLocalStorage(ctx context.Context, path string) error {
|
|
|
|
path, err := homedir.Expand(path)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("expanding local path: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// check that we have the path opened
|
|
|
|
lps, err := m.localStore.Local(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("getting local path list: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
var localPath *storiface.StoragePath
|
|
|
|
for _, lp := range lps {
|
|
|
|
if lp.LocalPath == path {
|
2022-07-15 10:56:03 +00:00
|
|
|
lp := lp // copy to make the linter happy
|
2022-07-15 10:44:05 +00:00
|
|
|
localPath = &lp
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if localPath == nil {
|
|
|
|
return xerrors.Errorf("no local paths match '%s'", path)
|
|
|
|
}
|
|
|
|
|
|
|
|
// drop from the persisted storage.json
|
|
|
|
var found bool
|
2022-11-01 11:01:31 +00:00
|
|
|
if err := m.ls.SetStorage(func(sc *storiface.StorageConfig) {
|
|
|
|
out := make([]storiface.LocalPath, 0, len(sc.StoragePaths))
|
2022-07-15 10:44:05 +00:00
|
|
|
for _, storagePath := range sc.StoragePaths {
|
|
|
|
if storagePath.Path != path {
|
|
|
|
out = append(out, storagePath)
|
2022-07-15 10:56:03 +00:00
|
|
|
continue
|
2022-07-15 10:44:05 +00:00
|
|
|
}
|
|
|
|
found = true
|
|
|
|
}
|
2022-07-15 10:56:03 +00:00
|
|
|
sc.StoragePaths = out
|
2022-07-15 10:44:05 +00:00
|
|
|
}); err != nil {
|
|
|
|
return xerrors.Errorf("set storage config: %w", err)
|
|
|
|
}
|
|
|
|
if !found {
|
|
|
|
// maybe this is fine?
|
|
|
|
return xerrors.Errorf("path not found in storage.json")
|
|
|
|
}
|
|
|
|
|
|
|
|
// unregister locally, drop from sector index
|
|
|
|
return m.localStore.ClosePath(ctx, localPath.ID)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) RedeclareLocalStorage(ctx context.Context, id *storiface.ID, dropMissing bool) error {
|
|
|
|
return m.localStore.Redeclare(ctx, id, dropMissing)
|
|
|
|
}
|
|
|
|
|
2020-03-23 11:40:02 +00:00
|
|
|
func (m *Manager) AddWorker(ctx context.Context, w Worker) error {
|
2022-01-14 13:11:04 +00:00
|
|
|
sessID, err := w.Session(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("getting worker session: %w", err)
|
|
|
|
}
|
|
|
|
if sessID == ClosedWorkerID {
|
|
|
|
return xerrors.Errorf("worker already closed")
|
|
|
|
}
|
|
|
|
|
|
|
|
wid := storiface.WorkerID(sessID)
|
|
|
|
|
|
|
|
whnd, err := newWorkerHandle(ctx, w)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
tasks, err := w.TaskTypes(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("getting worker tasks: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if m.windowPoStSched.MaybeAddWorker(wid, tasks, whnd) ||
|
|
|
|
m.winningPoStSched.MaybeAddWorker(wid, tasks, whnd) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return m.sched.runWorker(ctx, wid, whnd)
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
|
|
m.remoteHnd.ServeHTTP(w, r)
|
|
|
|
}
|
|
|
|
|
2023-02-28 09:33:15 +00:00
|
|
|
var schedNop = PrepareAction{
|
|
|
|
Action: func(ctx context.Context, w Worker) error {
|
|
|
|
return nil
|
|
|
|
},
|
|
|
|
PrepType: sealtasks.TTNoop,
|
2020-04-27 18:37:31 +00:00
|
|
|
}
|
|
|
|
|
2023-02-28 09:33:15 +00:00
|
|
|
func (m *Manager) schedFetch(sector storiface.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) PrepareAction {
|
|
|
|
return PrepareAction{
|
|
|
|
Action: func(ctx context.Context, worker Worker) error {
|
|
|
|
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am))
|
|
|
|
return err
|
|
|
|
},
|
|
|
|
PrepType: sealtasks.TTFetch,
|
2020-04-27 18:37:31 +00:00
|
|
|
}
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
|
|
|
|
2021-05-18 11:35:25 +00:00
|
|
|
// SectorsUnsealPiece will Unseal the Sealed sector file for the given sector.
|
|
|
|
// It will schedule the Unsealing task on a worker that either already has the sealed sector files or has space in
|
|
|
|
// one of it's sealing scratch spaces to store them after fetching them from another worker.
|
|
|
|
// If the chosen worker already has the Unsealed sector file, we will NOT Unseal the sealed sector file again.
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storiface.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed *cid.Cid) error {
|
2020-09-11 19:04:54 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
2021-04-20 09:19:00 +00:00
|
|
|
log.Debugf("acquire unseal sector lock for sector %d", sector.ID)
|
2022-04-07 21:00:40 +00:00
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTUnsealed); err != nil {
|
2020-09-11 19:04:54 +00:00
|
|
|
return xerrors.Errorf("acquiring unseal sector lock: %w", err)
|
2020-07-30 20:03:43 +00:00
|
|
|
}
|
2020-05-14 01:01:38 +00:00
|
|
|
|
2023-04-10 07:53:19 +00:00
|
|
|
// Check if sealed or update sector file exists
|
|
|
|
s, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed|storiface.FTUpdate, 0, false)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("finding sealed or updated sector: %w", err)
|
|
|
|
}
|
|
|
|
if len(s) == 0 {
|
|
|
|
return xerrors.Errorf("sealed or updated sector file not found for sector %d", sector.ID)
|
|
|
|
}
|
|
|
|
|
2021-05-18 11:35:25 +00:00
|
|
|
// if the selected worker does NOT have the sealed files for the sector, instruct it to fetch it from a worker that has them and
|
|
|
|
// put it in the sealing scratch space.
|
2023-09-21 15:37:02 +00:00
|
|
|
unsealFetch := PrepareAction{
|
2023-02-28 09:33:15 +00:00
|
|
|
Action: func(ctx context.Context, worker Worker) error {
|
|
|
|
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
|
|
|
|
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy))
|
|
|
|
_, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy))
|
|
|
|
|
|
|
|
if err != nil && err2 != nil {
|
|
|
|
return xerrors.Errorf("cannot unseal piece. error fetching sealed data: %w. error fetching replica data: %w", err, err2)
|
|
|
|
}
|
2020-05-20 16:36:46 +00:00
|
|
|
|
2023-02-28 09:33:15 +00:00
|
|
|
return nil
|
|
|
|
},
|
|
|
|
PrepType: sealtasks.TTFetch,
|
2021-05-19 11:05:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if unsealed == nil {
|
|
|
|
return xerrors.Errorf("cannot unseal piece (sector: %d, offset: %d size: %d) - unsealed cid is undefined", sector, offset, size)
|
|
|
|
}
|
|
|
|
|
|
|
|
ssize, err := sector.ProofType.SectorSize()
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("getting sector size: %w", err)
|
|
|
|
}
|
|
|
|
|
2021-05-18 11:35:25 +00:00
|
|
|
// selector will schedule the Unseal task on a worker that either already has the sealed sector files or has space in
|
|
|
|
// one of it's sealing scratch spaces to store them after fetching them from another worker.
|
2021-05-18 07:32:30 +00:00
|
|
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTSealed|storiface.FTCache, true)
|
2021-05-19 11:05:07 +00:00
|
|
|
|
2021-05-21 13:31:17 +00:00
|
|
|
log.Debugf("will schedule unseal for sector %d", sector.ID)
|
2023-09-21 15:37:02 +00:00
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error {
|
2021-05-19 11:05:07 +00:00
|
|
|
// TODO: make restartable
|
|
|
|
|
|
|
|
// NOTE: we're unsealing the whole sector here as with SDR we can't really
|
|
|
|
// unseal the sector partially. Requesting the whole sector here can
|
|
|
|
// save us some work in case another piece is requested from here
|
2021-05-21 13:31:17 +00:00
|
|
|
log.Debugf("calling unseal sector on worker, sectoID=%d", sector.ID)
|
2021-05-18 11:35:25 +00:00
|
|
|
|
2021-05-19 05:47:56 +00:00
|
|
|
// Note: This unseal piece call will essentially become a no-op if the worker already has an Unsealed sector file for the given sector.
|
2021-05-19 11:05:07 +00:00
|
|
|
_, err := m.waitSimpleCall(ctx)(w.UnsealPiece(ctx, sector, 0, abi.PaddedPieceSize(ssize).Unpadded(), ticket, *unsealed))
|
2021-04-20 09:19:00 +00:00
|
|
|
log.Debugf("completed unseal sector %d", sector.ID)
|
2021-05-19 11:05:07 +00:00
|
|
|
return err
|
|
|
|
})
|
|
|
|
if err != nil {
|
2021-05-18 11:35:25 +00:00
|
|
|
return xerrors.Errorf("worker UnsealPiece call: %s", err)
|
2021-05-19 11:05:07 +00:00
|
|
|
}
|
|
|
|
|
2022-11-23 20:22:14 +00:00
|
|
|
// get a selector for moving unsealed sector into long-term storage
|
|
|
|
fetchSel := newMoveSelector(m.index, sector.ID, storiface.FTUnsealed, storiface.PathStorage, !m.disallowRemoteFinalize)
|
|
|
|
|
|
|
|
// move unsealed sector to long-term storage
|
|
|
|
// Possible TODO: Add an option to not keep the unsealed sector in long term storage?
|
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
|
|
|
m.schedFetch(sector, storiface.FTUnsealed, storiface.PathStorage, storiface.AcquireMove),
|
|
|
|
func(ctx context.Context, w Worker) error {
|
|
|
|
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTUnsealed))
|
|
|
|
return err
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("moving unsealed sector to long term storage: %w", err)
|
|
|
|
}
|
|
|
|
|
2021-05-19 11:05:07 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) NewSector(ctx context.Context, sector storiface.SectorRef) error {
|
2020-03-23 11:40:02 +00:00
|
|
|
log.Warnf("stub NewSector")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (abi.PieceInfo, error) {
|
2022-04-26 16:22:52 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
selector := newTaskSelector()
|
|
|
|
|
|
|
|
var out abi.PieceInfo
|
2022-06-17 11:31:05 +00:00
|
|
|
err := m.sched.Schedule(ctx, storiface.NoSectorRef, sealtasks.TTDataCid, selector, schedNop, func(ctx context.Context, w Worker) error {
|
2022-04-26 16:22:52 +00:00
|
|
|
p, err := m.waitSimpleCall(ctx)(w.DataCid(ctx, pieceSize, pieceData))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if p != nil {
|
|
|
|
out = p.(abi.PieceInfo)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
|
|
|
|
return out, err
|
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) AddPiece(ctx context.Context, sector storiface.SectorRef, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
|
2020-06-03 20:00:34 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTUnsealed); err != nil {
|
2020-06-03 20:00:34 +00:00
|
|
|
return abi.PieceInfo{}, xerrors.Errorf("acquiring sector lock: %w", err)
|
|
|
|
}
|
|
|
|
|
2020-04-27 18:37:31 +00:00
|
|
|
var selector WorkerSelector
|
2020-03-23 11:40:02 +00:00
|
|
|
var err error
|
|
|
|
if len(existingPieces) == 0 { // new
|
2020-09-06 16:54:00 +00:00
|
|
|
selector = newAllocSelector(m.index, storiface.FTUnsealed, storiface.PathSealing)
|
2020-05-14 01:01:38 +00:00
|
|
|
} else { // use existing
|
2020-11-04 20:29:08 +00:00
|
|
|
selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false)
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
|
|
|
|
2020-04-27 18:37:31 +00:00
|
|
|
var out abi.PieceInfo
|
2020-05-13 23:56:21 +00:00
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTAddPiece, selector, schedNop, func(ctx context.Context, w Worker) error {
|
2020-09-16 20:33:49 +00:00
|
|
|
p, err := m.waitSimpleCall(ctx)(w.AddPiece(ctx, sector, existingPieces, sz, r))
|
2020-04-27 18:37:31 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-11-11 16:39:12 +00:00
|
|
|
if p != nil {
|
|
|
|
out = p.(abi.PieceInfo)
|
|
|
|
}
|
2020-04-27 18:37:31 +00:00
|
|
|
return nil
|
|
|
|
})
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2020-04-27 18:37:31 +00:00
|
|
|
return out, err
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) SealPreCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storiface.PreCommit1Out, err error) {
|
2020-06-03 20:00:34 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
2020-09-30 15:26:09 +00:00
|
|
|
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTPreCommit1, sector, ticket, pieces)
|
2020-09-16 15:08:05 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, xerrors.Errorf("getWork: %w", err)
|
|
|
|
}
|
2020-09-30 15:26:09 +00:00
|
|
|
defer cancel()
|
2020-09-16 15:08:05 +00:00
|
|
|
|
2020-09-30 15:48:48 +00:00
|
|
|
var waitErr error
|
2020-09-16 15:08:05 +00:00
|
|
|
waitRes := func() {
|
|
|
|
p, werr := m.waitWork(ctx, wk)
|
|
|
|
if werr != nil {
|
2020-09-30 15:48:48 +00:00
|
|
|
waitErr = werr
|
2020-09-16 15:08:05 +00:00
|
|
|
return
|
|
|
|
}
|
2020-11-11 16:39:12 +00:00
|
|
|
if p != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
out = p.(storiface.PreCommit1Out)
|
2020-11-11 16:39:12 +00:00
|
|
|
}
|
2020-09-16 15:08:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if wait { // already in progress
|
|
|
|
waitRes()
|
2020-09-30 16:16:07 +00:00
|
|
|
return out, waitErr
|
2020-09-16 15:08:05 +00:00
|
|
|
}
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache); err != nil {
|
2020-06-03 20:00:34 +00:00
|
|
|
return nil, xerrors.Errorf("acquiring sector lock: %w", err)
|
|
|
|
}
|
|
|
|
|
2020-03-23 11:40:02 +00:00
|
|
|
// TODO: also consider where the unsealed data sits
|
|
|
|
|
2020-09-06 16:54:00 +00:00
|
|
|
selector := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathSealing)
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2020-09-16 15:08:05 +00:00
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
2020-11-09 22:09:04 +00:00
|
|
|
err := m.startWork(ctx, w, wk)(w.SealPreCommit1(ctx, sector, ticket, pieces))
|
2020-04-27 18:37:31 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-09-16 15:08:05 +00:00
|
|
|
|
2020-09-16 20:33:49 +00:00
|
|
|
waitRes()
|
2020-04-27 18:37:31 +00:00
|
|
|
return nil
|
|
|
|
})
|
2020-09-30 15:48:48 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2020-09-30 15:48:48 +00:00
|
|
|
return out, waitErr
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) SealPreCommit2(ctx context.Context, sector storiface.SectorRef, phase1Out storiface.PreCommit1Out) (out storiface.SectorCids, err error) {
|
2020-06-03 20:00:34 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
2020-09-30 15:26:09 +00:00
|
|
|
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTPreCommit2, sector, phase1Out)
|
2020-09-16 20:33:49 +00:00
|
|
|
if err != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
return storiface.SectorCids{}, xerrors.Errorf("getWork: %w", err)
|
2020-09-16 20:33:49 +00:00
|
|
|
}
|
2020-09-30 15:26:09 +00:00
|
|
|
defer cancel()
|
2020-09-16 20:33:49 +00:00
|
|
|
|
2020-09-30 15:48:48 +00:00
|
|
|
var waitErr error
|
2020-09-16 20:33:49 +00:00
|
|
|
waitRes := func() {
|
|
|
|
p, werr := m.waitWork(ctx, wk)
|
|
|
|
if werr != nil {
|
2020-09-30 15:48:48 +00:00
|
|
|
waitErr = werr
|
2020-09-16 20:33:49 +00:00
|
|
|
return
|
|
|
|
}
|
2020-11-11 16:39:12 +00:00
|
|
|
if p != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
out = p.(storiface.SectorCids)
|
2020-11-11 16:39:12 +00:00
|
|
|
}
|
2020-09-16 20:33:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if wait { // already in progress
|
|
|
|
waitRes()
|
2020-09-30 16:16:07 +00:00
|
|
|
return out, waitErr
|
2020-09-16 20:33:49 +00:00
|
|
|
}
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed, storiface.FTCache); err != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
return storiface.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err)
|
2020-06-03 20:00:34 +00:00
|
|
|
}
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, true)
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2020-09-16 20:33:49 +00:00
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
2020-11-09 22:09:04 +00:00
|
|
|
err := m.startWork(ctx, w, wk)(w.SealPreCommit2(ctx, sector, phase1Out))
|
2020-04-27 18:37:31 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-09-16 20:33:49 +00:00
|
|
|
|
|
|
|
waitRes()
|
2020-04-27 18:37:31 +00:00
|
|
|
return nil
|
|
|
|
})
|
2020-09-30 15:48:48 +00:00
|
|
|
if err != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
return storiface.SectorCids{}, err
|
2020-09-30 15:48:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return out, waitErr
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) SealCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storiface.SectorCids) (out storiface.Commit1Out, err error) {
|
2020-06-03 20:00:34 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
2020-09-30 15:26:09 +00:00
|
|
|
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTCommit1, sector, ticket, seed, pieces, cids)
|
2020-09-16 20:33:49 +00:00
|
|
|
if err != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
return storiface.Commit1Out{}, xerrors.Errorf("getWork: %w", err)
|
2020-09-16 20:33:49 +00:00
|
|
|
}
|
2020-09-30 15:26:09 +00:00
|
|
|
defer cancel()
|
2020-09-16 20:33:49 +00:00
|
|
|
|
2020-09-30 15:48:48 +00:00
|
|
|
var waitErr error
|
2020-09-16 20:33:49 +00:00
|
|
|
waitRes := func() {
|
|
|
|
p, werr := m.waitWork(ctx, wk)
|
|
|
|
if werr != nil {
|
2020-09-30 15:48:48 +00:00
|
|
|
waitErr = werr
|
2020-09-16 20:33:49 +00:00
|
|
|
return
|
|
|
|
}
|
2020-11-11 16:39:12 +00:00
|
|
|
if p != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
out = p.(storiface.Commit1Out)
|
2020-11-11 16:39:12 +00:00
|
|
|
}
|
2020-09-16 20:33:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if wait { // already in progress
|
|
|
|
waitRes()
|
2020-09-30 16:16:07 +00:00
|
|
|
return out, waitErr
|
2020-09-16 20:33:49 +00:00
|
|
|
}
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed, storiface.FTCache); err != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
return storiface.Commit1Out{}, xerrors.Errorf("acquiring sector lock: %w", err)
|
2020-06-03 20:00:34 +00:00
|
|
|
}
|
|
|
|
|
2020-06-03 19:21:27 +00:00
|
|
|
// NOTE: We set allowFetch to false in so that we always execute on a worker
|
|
|
|
// with direct access to the data. We want to do that because this step is
|
|
|
|
// generally very cheap / fast, and transferring data is not worth the effort
|
2020-11-04 20:29:08 +00:00
|
|
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2020-09-16 20:33:49 +00:00
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
2020-11-09 22:09:04 +00:00
|
|
|
err := m.startWork(ctx, w, wk)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
|
2020-04-27 18:37:31 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-09-16 20:33:49 +00:00
|
|
|
|
|
|
|
waitRes()
|
2020-04-27 18:37:31 +00:00
|
|
|
return nil
|
|
|
|
})
|
2020-09-30 15:48:48 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return out, waitErr
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) SealCommit2(ctx context.Context, sector storiface.SectorRef, phase1Out storiface.Commit1Out) (out storiface.Proof, err error) {
|
2020-09-30 15:26:09 +00:00
|
|
|
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTCommit2, sector, phase1Out)
|
2020-09-16 20:33:49 +00:00
|
|
|
if err != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
return storiface.Proof{}, xerrors.Errorf("getWork: %w", err)
|
2020-09-16 20:33:49 +00:00
|
|
|
}
|
2020-09-30 15:26:09 +00:00
|
|
|
defer cancel()
|
2020-09-16 20:33:49 +00:00
|
|
|
|
2020-09-30 15:48:48 +00:00
|
|
|
var waitErr error
|
2020-09-16 20:33:49 +00:00
|
|
|
waitRes := func() {
|
|
|
|
p, werr := m.waitWork(ctx, wk)
|
|
|
|
if werr != nil {
|
2020-09-30 15:48:48 +00:00
|
|
|
waitErr = werr
|
2020-09-16 20:33:49 +00:00
|
|
|
return
|
|
|
|
}
|
2020-11-11 16:39:12 +00:00
|
|
|
if p != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
out = p.(storiface.Proof)
|
2020-11-11 16:39:12 +00:00
|
|
|
}
|
2020-09-16 20:33:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if wait { // already in progress
|
|
|
|
waitRes()
|
2020-09-30 16:16:07 +00:00
|
|
|
return out, waitErr
|
2020-09-16 20:33:49 +00:00
|
|
|
}
|
|
|
|
|
2020-04-27 18:37:31 +00:00
|
|
|
selector := newTaskSelector()
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2020-05-13 23:56:21 +00:00
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error {
|
2020-11-09 22:09:04 +00:00
|
|
|
err := m.startWork(ctx, w, wk)(w.SealCommit2(ctx, sector, phase1Out))
|
2020-03-23 11:40:02 +00:00
|
|
|
if err != nil {
|
2020-04-27 18:37:31 +00:00
|
|
|
return err
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
2020-09-16 20:33:49 +00:00
|
|
|
|
|
|
|
waitRes()
|
2020-04-27 18:37:31 +00:00
|
|
|
return nil
|
|
|
|
})
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2020-09-30 15:48:48 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return out, waitErr
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
|
|
|
|
2022-11-16 16:07:42 +00:00
|
|
|
// sectorStorageType tries to figure out storage type for a given sector; expects only a single copy of the file in the
|
|
|
|
// storage system
|
2022-11-23 17:46:37 +00:00
|
|
|
func (m *Manager) sectorStorageType(ctx context.Context, sector storiface.SectorRef, ft storiface.SectorFileType) (sectorFound bool, ptype storiface.PathType, err error) {
|
2022-11-16 16:07:42 +00:00
|
|
|
stores, err := m.index.StorageFindSector(ctx, sector.ID, ft, 0, false)
|
|
|
|
if err != nil {
|
|
|
|
return false, "", xerrors.Errorf("finding sector: %w", err)
|
|
|
|
}
|
|
|
|
if len(stores) == 0 {
|
|
|
|
return false, "", nil
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, store := range stores {
|
|
|
|
if store.CanSeal {
|
|
|
|
return true, storiface.PathSealing, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return true, storiface.PathStorage, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef) error {
|
2020-06-03 20:00:34 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
2020-11-04 20:29:08 +00:00
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil {
|
2020-06-03 20:00:34 +00:00
|
|
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-11-16 15:25:19 +00:00
|
|
|
/*
|
|
|
|
We want to:
|
2022-11-16 17:11:20 +00:00
|
|
|
- Trim cache
|
|
|
|
- Move stuff to long-term storage
|
2022-11-16 15:25:19 +00:00
|
|
|
*/
|
2020-06-04 19:15:15 +00:00
|
|
|
|
2022-11-16 15:25:19 +00:00
|
|
|
// remove redundant copies if there are any
|
2022-11-16 16:07:42 +00:00
|
|
|
if err := m.storage.RemoveCopies(ctx, sector.ID, storiface.FTUnsealed); err != nil {
|
|
|
|
return xerrors.Errorf("remove copies (sealed): %w", err)
|
|
|
|
}
|
2022-11-16 15:25:19 +00:00
|
|
|
if err := m.storage.RemoveCopies(ctx, sector.ID, storiface.FTSealed); err != nil {
|
|
|
|
return xerrors.Errorf("remove copies (sealed): %w", err)
|
|
|
|
}
|
|
|
|
if err := m.storage.RemoveCopies(ctx, sector.ID, storiface.FTCache); err != nil {
|
|
|
|
return xerrors.Errorf("remove copies (cache): %w", err)
|
2020-06-04 19:15:15 +00:00
|
|
|
}
|
|
|
|
|
2022-11-16 15:25:19 +00:00
|
|
|
// Make sure that the cache files are still in sealing storage; In case not,
|
|
|
|
// we want to do finalize in long-term storage
|
2022-11-16 16:07:42 +00:00
|
|
|
_, cachePathType, err := m.sectorStorageType(ctx, sector, storiface.FTCache)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("checking cache storage type: %w", err)
|
2021-07-01 19:07:53 +00:00
|
|
|
}
|
|
|
|
|
2022-05-23 21:27:28 +00:00
|
|
|
// do the cache trimming wherever the likely still very large cache lives.
|
|
|
|
// we really don't want to move it.
|
2022-03-17 14:15:54 +00:00
|
|
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false)
|
2020-03-23 11:40:02 +00:00
|
|
|
|
2022-11-16 16:07:42 +00:00
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
|
|
|
m.schedFetch(sector, storiface.FTCache, cachePathType, storiface.AcquireMove),
|
2020-04-27 18:37:31 +00:00
|
|
|
func(ctx context.Context, w Worker) error {
|
2022-11-16 16:07:42 +00:00
|
|
|
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector))
|
2020-09-07 10:20:50 +00:00
|
|
|
return err
|
2020-04-27 18:37:31 +00:00
|
|
|
})
|
2020-06-03 21:44:59 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-05-23 21:27:28 +00:00
|
|
|
// get a selector for moving stuff into long-term storage
|
2022-05-23 21:53:25 +00:00
|
|
|
fetchSel := newMoveSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, storiface.PathStorage, !m.disallowRemoteFinalize)
|
2022-05-23 21:27:28 +00:00
|
|
|
|
|
|
|
// only move the unsealed file if it still exists and needs moving
|
2022-11-16 16:07:42 +00:00
|
|
|
moveUnsealed := storiface.FTUnsealed
|
2020-06-22 15:02:59 +00:00
|
|
|
{
|
2022-11-16 16:07:42 +00:00
|
|
|
found, unsealedPathType, err := m.sectorStorageType(ctx, sector, storiface.FTUnsealed)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("checking cache storage type: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if !found || unsealedPathType == storiface.PathStorage {
|
2020-09-06 16:54:00 +00:00
|
|
|
moveUnsealed = storiface.FTNone
|
2020-06-22 15:02:59 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-23 21:27:28 +00:00
|
|
|
// move stuff to long-term storage
|
2020-06-03 21:44:59 +00:00
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
2020-09-16 20:33:49 +00:00
|
|
|
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
|
2020-06-03 21:44:59 +00:00
|
|
|
func(ctx context.Context, w Worker) error {
|
2020-09-16 20:33:49 +00:00
|
|
|
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed))
|
2020-09-07 10:20:50 +00:00
|
|
|
return err
|
2020-06-03 21:44:59 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("moving sector to storage: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2020-03-23 11:40:02 +00:00
|
|
|
}
|
|
|
|
|
2022-11-16 16:07:42 +00:00
|
|
|
func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) error {
|
2022-02-02 20:23:35 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache); err != nil {
|
|
|
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-05-23 21:27:28 +00:00
|
|
|
// Make sure that the update file is still in sealing storage; In case it already
|
|
|
|
// isn't, we want to do finalize in long-term storage
|
2022-02-02 20:23:35 +00:00
|
|
|
pathType := storiface.PathStorage
|
|
|
|
{
|
|
|
|
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUpdate, 0, false)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("finding sealed sector: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, store := range sealedStores {
|
|
|
|
if store.CanSeal {
|
|
|
|
pathType = storiface.PathSealing
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-23 21:27:28 +00:00
|
|
|
// do the cache trimming wherever the likely still large cache lives.
|
|
|
|
// we really don't want to move it.
|
|
|
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdateCache, false)
|
2022-02-02 20:23:35 +00:00
|
|
|
|
|
|
|
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
|
2022-11-16 16:07:42 +00:00
|
|
|
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache, pathType, storiface.AcquireMove),
|
2022-02-02 20:23:35 +00:00
|
|
|
func(ctx context.Context, w Worker) error {
|
2022-11-16 16:07:42 +00:00
|
|
|
_, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector))
|
2022-02-02 20:23:35 +00:00
|
|
|
return err
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-03-16 10:49:48 +00:00
|
|
|
move := func(types storiface.SectorFileType) error {
|
2022-05-23 21:27:28 +00:00
|
|
|
// get a selector for moving stuff into long-term storage
|
2022-05-23 21:53:25 +00:00
|
|
|
fetchSel := newMoveSelector(m.index, sector.ID, types, storiface.PathStorage, !m.disallowRemoteFinalize)
|
2022-03-16 10:49:48 +00:00
|
|
|
|
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
|
|
|
m.schedFetch(sector, types, storiface.PathStorage, storiface.AcquireMove),
|
|
|
|
func(ctx context.Context, w Worker) error {
|
|
|
|
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, types))
|
|
|
|
return err
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("moving sector to storage: %w", err)
|
2022-02-02 20:23:35 +00:00
|
|
|
}
|
2022-03-16 10:49:48 +00:00
|
|
|
return nil
|
2022-02-02 20:23:35 +00:00
|
|
|
}
|
|
|
|
|
2022-03-16 10:49:48 +00:00
|
|
|
err = multierr.Append(move(storiface.FTUpdate|storiface.FTUpdateCache), move(storiface.FTCache))
|
|
|
|
err = multierr.Append(err, move(storiface.FTSealed)) // Sealed separate from cache just in case ReleaseSectorKey was already called
|
2022-11-16 16:07:42 +00:00
|
|
|
|
|
|
|
{
|
|
|
|
unsealedStores, ferr := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
|
|
|
if err != nil {
|
|
|
|
err = multierr.Append(err, xerrors.Errorf("find unsealed sector before move: %w", ferr))
|
|
|
|
} else if len(unsealedStores) > 0 {
|
|
|
|
// if we found unsealed files, AND have been asked to keep at least one piece, move unsealed
|
|
|
|
err = multierr.Append(err, move(storiface.FTUnsealed))
|
|
|
|
}
|
2022-02-02 20:23:35 +00:00
|
|
|
}
|
|
|
|
|
2022-03-30 02:28:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("moving sector to storage: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-02-02 20:23:35 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-11-16 16:07:42 +00:00
|
|
|
func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
|
2022-04-05 20:45:07 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTUnsealed); err != nil {
|
|
|
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-11-16 16:07:42 +00:00
|
|
|
found, pathType, err := m.sectorStorageType(ctx, sector, storiface.FTUnsealed)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("checking cache storage type: %w", err)
|
|
|
|
}
|
|
|
|
if !found {
|
|
|
|
// already removed
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false)
|
|
|
|
|
2022-11-23 17:54:58 +00:00
|
|
|
return m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, pathType, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
2022-11-16 16:07:42 +00:00
|
|
|
_, err := m.waitSimpleCall(ctx)(w.ReleaseUnsealed(ctx, sector, keepUnsealed))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
})
|
2020-06-22 15:02:59 +00:00
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) ReleaseSectorKey(ctx context.Context, sector storiface.SectorRef) error {
|
2021-12-01 19:01:55 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed); err != nil {
|
|
|
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return m.storage.Remove(ctx, sector.ID, storiface.FTSealed, true, nil)
|
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) ReleaseReplicaUpgrade(ctx context.Context, sector storiface.SectorRef) error {
|
2021-12-08 17:11:19 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTUpdateCache|storiface.FTUpdate); err != nil {
|
|
|
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := m.storage.Remove(ctx, sector.ID, storiface.FTUpdateCache, true, nil); err != nil {
|
|
|
|
return xerrors.Errorf("removing update cache: %w", err)
|
|
|
|
}
|
|
|
|
if err := m.storage.Remove(ctx, sector.ID, storiface.FTUpdate, true, nil); err != nil {
|
|
|
|
return xerrors.Errorf("removing update: %w", err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) GenerateSectorKeyFromData(ctx context.Context, sector storiface.SectorRef, commD cid.Cid) error {
|
2021-12-01 19:01:55 +00:00
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTRegenSectorKey, sector, commD)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("getWork: %w", err)
|
|
|
|
}
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
var waitErr error
|
|
|
|
waitRes := func() {
|
|
|
|
_, werr := m.waitWork(ctx, wk)
|
|
|
|
if werr != nil {
|
|
|
|
waitErr = werr
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if wait { // already in progress
|
|
|
|
waitRes()
|
|
|
|
return waitErr
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed|storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTSealed|storiface.FTCache); err != nil {
|
|
|
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// NOTE: We set allowFetch to false in so that we always execute on a worker
|
|
|
|
// with direct access to the data. We want to do that because this step is
|
|
|
|
// generally very cheap / fast, and transferring data is not worth the effort
|
|
|
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTUnsealed|storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTCache, true)
|
|
|
|
|
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTRegenSectorKey, selector, m.schedFetch(sector, storiface.FTUpdate|storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
|
|
|
err := m.startWork(ctx, w, wk)(w.GenerateSectorKeyFromData(ctx, sector, commD))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
waitRes()
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return waitErr
|
2021-11-10 18:53:00 +00:00
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) Remove(ctx context.Context, sector storiface.SectorRef) error {
|
2020-06-22 15:02:59 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
2021-12-01 19:01:55 +00:00
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache); err != nil {
|
2020-06-22 15:02:59 +00:00
|
|
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
|
|
|
}
|
|
|
|
|
2020-08-27 21:58:37 +00:00
|
|
|
var err error
|
2020-06-22 15:02:59 +00:00
|
|
|
|
2021-10-11 19:05:05 +00:00
|
|
|
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTSealed, true, nil); rerr != nil {
|
2020-08-27 21:58:37 +00:00
|
|
|
err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr))
|
|
|
|
}
|
2021-10-11 19:05:05 +00:00
|
|
|
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTCache, true, nil); rerr != nil {
|
2020-08-27 21:58:37 +00:00
|
|
|
err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr))
|
|
|
|
}
|
2021-10-11 19:05:05 +00:00
|
|
|
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true, nil); rerr != nil {
|
2020-08-27 21:58:37 +00:00
|
|
|
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
|
2020-06-22 15:02:59 +00:00
|
|
|
}
|
2021-12-01 19:01:55 +00:00
|
|
|
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTUpdate, true, nil); rerr != nil {
|
|
|
|
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
|
|
|
|
}
|
|
|
|
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTUpdateCache, true, nil); rerr != nil {
|
|
|
|
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
|
|
|
|
}
|
2020-06-22 15:02:59 +00:00
|
|
|
|
2020-08-27 21:58:37 +00:00
|
|
|
return err
|
2020-06-22 15:02:59 +00:00
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) ReplicaUpdate(ctx context.Context, sector storiface.SectorRef, pieces []abi.PieceInfo) (out storiface.ReplicaUpdateOut, err error) {
|
2021-11-10 18:53:00 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
2022-03-01 17:52:09 +00:00
|
|
|
log.Debugf("manager is doing replica update")
|
2021-11-10 18:53:00 +00:00
|
|
|
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTReplicaUpdate, sector, pieces)
|
|
|
|
if err != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
return storiface.ReplicaUpdateOut{}, xerrors.Errorf("getWork: %w", err)
|
2021-11-10 18:53:00 +00:00
|
|
|
}
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
var waitErr error
|
|
|
|
waitRes := func() {
|
|
|
|
p, werr := m.waitWork(ctx, wk)
|
|
|
|
if werr != nil {
|
2021-12-08 17:11:19 +00:00
|
|
|
waitErr = xerrors.Errorf("waitWork: %w", werr)
|
2021-11-10 18:53:00 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
if p != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
out = p.(storiface.ReplicaUpdateOut)
|
2021-11-10 18:53:00 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if wait { // already in progress
|
|
|
|
waitRes()
|
|
|
|
return out, waitErr
|
|
|
|
}
|
|
|
|
|
2022-01-21 16:10:44 +00:00
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed|storiface.FTSealed|storiface.FTCache, storiface.FTUpdate|storiface.FTUpdateCache); err != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
return storiface.ReplicaUpdateOut{}, xerrors.Errorf("acquiring sector lock: %w", err)
|
2021-11-10 18:53:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
selector := newAllocSelector(m.index, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing)
|
|
|
|
|
2022-01-21 16:10:44 +00:00
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTReplicaUpdate, selector, m.schedFetch(sector, storiface.FTUnsealed|storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy), func(ctx context.Context, w Worker) error {
|
2021-11-10 18:53:00 +00:00
|
|
|
err := m.startWork(ctx, w, wk)(w.ReplicaUpdate(ctx, sector, pieces))
|
|
|
|
if err != nil {
|
2021-12-08 17:11:19 +00:00
|
|
|
return xerrors.Errorf("startWork: %w", err)
|
2021-11-10 18:53:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
waitRes()
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
return storiface.ReplicaUpdateOut{}, xerrors.Errorf("Schedule: %w", err)
|
2021-11-10 18:53:00 +00:00
|
|
|
}
|
|
|
|
return out, waitErr
|
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) ProveReplicaUpdate1(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (out storiface.ReplicaVanillaProofs, err error) {
|
2021-11-10 18:53:00 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTProveReplicaUpdate1, sector, sectorKey, newSealed, newUnsealed)
|
|
|
|
if err != nil {
|
|
|
|
return nil, xerrors.Errorf("getWork: %w", err)
|
|
|
|
}
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
var waitErr error
|
|
|
|
waitRes := func() {
|
|
|
|
p, werr := m.waitWork(ctx, wk)
|
|
|
|
if werr != nil {
|
|
|
|
waitErr = werr
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if p != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
out = p.(storiface.ReplicaVanillaProofs)
|
2021-11-10 18:53:00 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if wait { // already in progress
|
|
|
|
waitRes()
|
|
|
|
return out, waitErr
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTUpdate|storiface.FTCache|storiface.FTUpdateCache, storiface.FTNone); err != nil {
|
|
|
|
return nil, xerrors.Errorf("acquiring sector lock: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-01-20 20:01:26 +00:00
|
|
|
// NOTE: We set allowFetch to false in so that we always execute on a worker
|
|
|
|
// with direct access to the data. We want to do that because this step is
|
|
|
|
// generally very cheap / fast, and transferring data is not worth the effort
|
2022-03-31 21:01:33 +00:00
|
|
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdate|storiface.FTUpdateCache, false)
|
2021-11-10 18:53:00 +00:00
|
|
|
|
2022-01-20 20:01:26 +00:00
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTProveReplicaUpdate1, selector, m.schedFetch(sector, storiface.FTSealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy), func(ctx context.Context, w Worker) error {
|
2021-11-10 18:53:00 +00:00
|
|
|
|
|
|
|
err := m.startWork(ctx, w, wk)(w.ProveReplicaUpdate1(ctx, sector, sectorKey, newSealed, newUnsealed))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
waitRes()
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return out, waitErr
|
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) ProveReplicaUpdate2(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storiface.ReplicaVanillaProofs) (out storiface.ReplicaUpdateProof, err error) {
|
2021-11-10 18:53:00 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTProveReplicaUpdate2, sector, sectorKey, newSealed, newUnsealed, vanillaProofs)
|
|
|
|
if err != nil {
|
|
|
|
return nil, xerrors.Errorf("getWork: %w", err)
|
|
|
|
}
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
var waitErr error
|
|
|
|
waitRes := func() {
|
|
|
|
p, werr := m.waitWork(ctx, wk)
|
|
|
|
if werr != nil {
|
|
|
|
waitErr = werr
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if p != nil {
|
2022-06-17 11:31:05 +00:00
|
|
|
out = p.(storiface.ReplicaUpdateProof)
|
2021-11-10 18:53:00 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if wait { // already in progress
|
|
|
|
waitRes()
|
|
|
|
return out, waitErr
|
|
|
|
}
|
|
|
|
|
|
|
|
selector := newTaskSelector()
|
|
|
|
|
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTProveReplicaUpdate2, selector, schedNop, func(ctx context.Context, w Worker) error {
|
|
|
|
err := m.startWork(ctx, w, wk)(w.ProveReplicaUpdate2(ctx, sector, sectorKey, newSealed, newUnsealed, vanillaProofs))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
waitRes()
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return out, waitErr
|
|
|
|
}
|
|
|
|
|
2022-09-16 21:45:23 +00:00
|
|
|
func (m *Manager) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) error {
|
2022-08-31 11:56:25 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
var toFetch storiface.SectorFileType
|
|
|
|
|
|
|
|
// get a sorted list of sectors files to make a consistent work key from
|
|
|
|
ents := make([]struct {
|
|
|
|
T storiface.SectorFileType
|
2022-09-16 21:45:23 +00:00
|
|
|
S storiface.SectorLocation
|
2022-08-31 11:56:25 +00:00
|
|
|
}, 0, len(src))
|
|
|
|
for fileType, data := range src {
|
|
|
|
if len(fileType.AllSet()) != 1 {
|
|
|
|
return xerrors.Errorf("sector data entry must be for a single file type")
|
|
|
|
}
|
|
|
|
|
|
|
|
toFetch |= fileType
|
|
|
|
|
|
|
|
ents = append(ents, struct {
|
|
|
|
T storiface.SectorFileType
|
2022-09-16 21:45:23 +00:00
|
|
|
S storiface.SectorLocation
|
2022-08-31 11:56:25 +00:00
|
|
|
}{T: fileType, S: data})
|
|
|
|
}
|
|
|
|
sort.Slice(ents, func(i, j int) bool {
|
|
|
|
return ents[i].T < ents[j].T
|
|
|
|
})
|
|
|
|
|
|
|
|
// get a work key
|
|
|
|
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTDownloadSector, sector, ents)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("getWork: %w", err)
|
|
|
|
}
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
var waitErr error
|
|
|
|
waitRes := func() {
|
|
|
|
_, werr := m.waitWork(ctx, wk)
|
|
|
|
if werr != nil {
|
|
|
|
waitErr = werr
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if wait { // already in progress
|
|
|
|
waitRes()
|
|
|
|
return waitErr
|
|
|
|
}
|
|
|
|
|
|
|
|
ptype := storiface.PathSealing
|
|
|
|
if finalized {
|
|
|
|
ptype = storiface.PathStorage
|
|
|
|
}
|
|
|
|
|
|
|
|
selector := newAllocSelector(m.index, toFetch, ptype)
|
|
|
|
|
|
|
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTDownloadSector, selector, schedNop, func(ctx context.Context, w Worker) error {
|
|
|
|
err := m.startWork(ctx, w, wk)(w.DownloadSectorData(ctx, sector, finalized, src))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
waitRes()
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return waitErr
|
|
|
|
}
|
|
|
|
|
2022-04-26 16:22:52 +00:00
|
|
|
func (m *Manager) ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
|
|
|
|
return m.returnResult(ctx, callID, pi, err)
|
|
|
|
}
|
|
|
|
|
2020-11-17 15:17:45 +00:00
|
|
|
func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
|
2021-02-21 10:03:00 +00:00
|
|
|
return m.returnResult(ctx, callID, pi, err)
|
2020-09-07 10:20:50 +00:00
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storiface.PreCommit1Out, err *storiface.CallError) error {
|
2021-02-21 10:03:00 +00:00
|
|
|
return m.returnResult(ctx, callID, p1o, err)
|
2020-09-07 10:20:50 +00:00
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storiface.SectorCids, err *storiface.CallError) error {
|
2021-02-21 10:03:00 +00:00
|
|
|
return m.returnResult(ctx, callID, sealed, err)
|
2020-09-07 10:20:50 +00:00
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storiface.Commit1Out, err *storiface.CallError) error {
|
2021-02-21 10:03:00 +00:00
|
|
|
return m.returnResult(ctx, callID, out, err)
|
2020-09-07 10:20:50 +00:00
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storiface.Proof, err *storiface.CallError) error {
|
2021-02-21 10:03:00 +00:00
|
|
|
return m.returnResult(ctx, callID, proof, err)
|
2020-09-07 10:20:50 +00:00
|
|
|
}
|
|
|
|
|
2020-11-17 15:17:45 +00:00
|
|
|
func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
|
2021-02-21 10:03:00 +00:00
|
|
|
return m.returnResult(ctx, callID, nil, err)
|
2020-09-07 10:20:50 +00:00
|
|
|
}
|
|
|
|
|
2020-11-17 15:17:45 +00:00
|
|
|
func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
|
2021-02-21 10:03:00 +00:00
|
|
|
return m.returnResult(ctx, callID, nil, err)
|
2020-09-07 10:20:50 +00:00
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) ReturnReplicaUpdate(ctx context.Context, callID storiface.CallID, out storiface.ReplicaUpdateOut, err *storiface.CallError) error {
|
2021-11-10 18:53:00 +00:00
|
|
|
return m.returnResult(ctx, callID, out, err)
|
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) ReturnProveReplicaUpdate1(ctx context.Context, callID storiface.CallID, out storiface.ReplicaVanillaProofs, err *storiface.CallError) error {
|
2021-11-10 18:53:00 +00:00
|
|
|
return m.returnResult(ctx, callID, out, err)
|
|
|
|
}
|
|
|
|
|
2022-06-17 11:31:05 +00:00
|
|
|
func (m *Manager) ReturnProveReplicaUpdate2(ctx context.Context, callID storiface.CallID, proof storiface.ReplicaUpdateProof, err *storiface.CallError) error {
|
2021-11-10 18:53:00 +00:00
|
|
|
return m.returnResult(ctx, callID, proof, err)
|
|
|
|
}
|
|
|
|
|
2022-02-08 14:06:42 +00:00
|
|
|
func (m *Manager) ReturnFinalizeReplicaUpdate(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
|
|
|
|
return m.returnResult(ctx, callID, nil, err)
|
|
|
|
}
|
|
|
|
|
2021-12-01 19:01:55 +00:00
|
|
|
func (m *Manager) ReturnGenerateSectorKeyFromData(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
|
|
|
|
return m.returnResult(ctx, callID, nil, err)
|
|
|
|
}
|
|
|
|
|
2020-11-17 15:17:45 +00:00
|
|
|
func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
|
2021-02-21 10:03:00 +00:00
|
|
|
return m.returnResult(ctx, callID, nil, err)
|
2020-09-07 10:20:50 +00:00
|
|
|
}
|
|
|
|
|
2020-11-17 15:17:45 +00:00
|
|
|
func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
|
2021-02-21 10:03:00 +00:00
|
|
|
return m.returnResult(ctx, callID, nil, err)
|
2020-09-07 10:20:50 +00:00
|
|
|
}
|
|
|
|
|
2020-11-17 15:17:45 +00:00
|
|
|
func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error {
|
2021-02-21 10:03:00 +00:00
|
|
|
return m.returnResult(ctx, callID, ok, err)
|
2020-09-07 10:20:50 +00:00
|
|
|
}
|
|
|
|
|
2022-08-31 11:56:25 +00:00
|
|
|
func (m *Manager) ReturnDownloadSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
|
|
|
|
return m.returnResult(ctx, callID, nil, err)
|
|
|
|
}
|
|
|
|
|
2020-11-17 15:17:45 +00:00
|
|
|
func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
|
2021-02-21 10:03:00 +00:00
|
|
|
return m.returnResult(ctx, callID, nil, err)
|
2020-09-07 10:20:50 +00:00
|
|
|
}
|
|
|
|
|
2022-01-18 10:57:04 +00:00
|
|
|
func (m *Manager) StorageLocal(ctx context.Context) (map[storiface.ID]string, error) {
|
2020-03-23 11:40:02 +00:00
|
|
|
l, err := m.localStore.Local(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2022-01-18 10:57:04 +00:00
|
|
|
out := map[storiface.ID]string{}
|
2020-03-23 11:40:02 +00:00
|
|
|
for _, st := range l {
|
|
|
|
out[st.ID] = st.LocalPath
|
|
|
|
}
|
|
|
|
|
|
|
|
return out, nil
|
|
|
|
}
|
|
|
|
|
2022-01-18 10:57:04 +00:00
|
|
|
func (m *Manager) FsStat(ctx context.Context, id storiface.ID) (fsutil.FsStat, error) {
|
2020-03-23 22:43:38 +00:00
|
|
|
return m.storage.FsStat(ctx, id)
|
|
|
|
}
|
|
|
|
|
2020-10-30 10:07:35 +00:00
|
|
|
func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, error) {
|
|
|
|
if doSched {
|
|
|
|
select {
|
|
|
|
case m.sched.workerChange <- struct{}{}:
|
|
|
|
case <-ctx.Done():
|
|
|
|
return nil, ctx.Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-11 16:39:31 +00:00
|
|
|
si, err := m.sched.Info(ctx)
|
2020-11-11 15:47:44 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
type SchedInfo interface{}
|
2020-11-11 16:39:31 +00:00
|
|
|
i := struct {
|
2020-11-11 15:47:44 +00:00
|
|
|
SchedInfo
|
|
|
|
|
|
|
|
ReturnedWork []string
|
2020-11-11 16:39:31 +00:00
|
|
|
Waiting []string
|
2020-11-11 15:47:44 +00:00
|
|
|
|
|
|
|
CallToWork map[string]string
|
|
|
|
|
|
|
|
EarlyRet []string
|
|
|
|
}{
|
|
|
|
SchedInfo: si,
|
|
|
|
|
|
|
|
CallToWork: map[string]string{},
|
|
|
|
}
|
|
|
|
|
|
|
|
m.workLk.Lock()
|
|
|
|
|
|
|
|
for w := range m.results {
|
|
|
|
i.ReturnedWork = append(i.ReturnedWork, w.String())
|
|
|
|
}
|
|
|
|
|
|
|
|
for id := range m.callRes {
|
|
|
|
i.EarlyRet = append(i.EarlyRet, id.String())
|
|
|
|
}
|
|
|
|
|
|
|
|
for w := range m.waitRes {
|
|
|
|
i.Waiting = append(i.Waiting, w.String())
|
|
|
|
}
|
|
|
|
|
|
|
|
for c, w := range m.callToWork {
|
|
|
|
i.CallToWork[c.String()] = w.String()
|
|
|
|
}
|
|
|
|
|
|
|
|
m.workLk.Unlock()
|
|
|
|
|
|
|
|
return i, nil
|
2020-07-27 10:17:09 +00:00
|
|
|
}
|
|
|
|
|
2022-08-02 09:47:30 +00:00
|
|
|
func (m *Manager) RemoveSchedRequest(ctx context.Context, schedId uuid.UUID) error {
|
|
|
|
return m.sched.RemoveRequest(ctx, schedId)
|
2022-07-20 04:50:05 +00:00
|
|
|
}
|
|
|
|
|
2020-07-17 10:59:12 +00:00
|
|
|
func (m *Manager) Close(ctx context.Context) error {
|
2022-01-14 13:11:04 +00:00
|
|
|
m.windowPoStSched.schedClose()
|
|
|
|
m.winningPoStSched.schedClose()
|
2020-07-17 10:59:12 +00:00
|
|
|
return m.sched.Close(ctx)
|
2020-03-24 23:49:45 +00:00
|
|
|
}
|
|
|
|
|
2021-05-18 07:32:30 +00:00
|
|
|
var _ Unsealer = &Manager{}
|
2020-03-23 11:40:02 +00:00
|
|
|
var _ SectorManager = &Manager{}
|