package sectorstorage import ( "context" "errors" "io" "net/http" "sync" "github.com/google/uuid" "github.com/hashicorp/go-multierror" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/mitchellh/go-homedir" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) var log = logging.Logger("advmgr") var ErrNoWorkers = errors.New("no suitable workers found") type URLs []string type Worker interface { storiface.WorkerCalls TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) // Returns paths accessible to the worker Paths(context.Context) ([]stores.StoragePath, error) Info(context.Context) (storiface.WorkerInfo, error) Session(context.Context) (uuid.UUID, error) Close() error // TODO: do we need this? } type SectorManager interface { SectorSize() abi.SectorSize ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error ffiwrapper.StorageSealer storage.Prover storiface.WorkerReturn FaultTracker } type WorkerID uuid.UUID // worker session UUID var ClosedWorkerID = uuid.UUID{} type Manager struct { scfg *ffiwrapper.Config ls stores.LocalStorage storage *stores.Remote localStore *stores.Local remoteHnd *stores.FetchHandler index stores.SectorIndex sched *scheduler storage.Prover workLk sync.Mutex work *statestore.StateStore callToWork map[storiface.CallID]WorkID // used when we get an early return and there's no callToWork mapping callRes map[storiface.CallID]chan result results map[WorkID]result waitRes map[WorkID]chan struct{} } type result struct { r interface{} err error } type SealerConfig struct { ParallelFetchLimit int // Local worker config AllowAddPiece bool AllowPreCommit1 bool AllowPreCommit2 bool AllowCommit bool AllowUnseal bool } type StorageAuth http.Header type WorkerStateStore *statestore.StateStore type ManagerStateStore *statestore.StateStore func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc SealerConfig, urls URLs, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { lstor, err := stores.NewLocal(ctx, ls, si, urls) if err != nil { return nil, err } prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si, spt: cfg.SealProofType}, cfg) if err != nil { return nil, xerrors.Errorf("creating prover instance: %w", err) } stor := stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit) m := &Manager{ scfg: cfg, ls: ls, storage: stor, localStore: lstor, remoteHnd: &stores.FetchHandler{Local: lstor}, index: si, sched: newScheduler(cfg.SealProofType), Prover: prover, work: mss, callToWork: map[storiface.CallID]WorkID{}, callRes: map[storiface.CallID]chan result{}, results: map[WorkID]result{}, waitRes: map[WorkID]chan struct{}{}, } m.setupWorkTracker() go m.sched.runSched() localTasks := []sealtasks.TaskType{ sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTReadUnsealed, } if sc.AllowAddPiece { localTasks = append(localTasks, sealtasks.TTAddPiece) } if sc.AllowPreCommit1 { localTasks = append(localTasks, sealtasks.TTPreCommit1) } if sc.AllowPreCommit2 { localTasks = append(localTasks, sealtasks.TTPreCommit2) } if sc.AllowCommit { localTasks = append(localTasks, sealtasks.TTCommit2) } if sc.AllowUnseal { localTasks = append(localTasks, sealtasks.TTUnseal) } err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{ SealProof: cfg.SealProofType, TaskTypes: localTasks, }, stor, lstor, si, m, wss)) if err != nil { return nil, xerrors.Errorf("adding local worker: %w", err) } return m, nil } func (m *Manager) AddLocalStorage(ctx context.Context, path string) error { path, err := homedir.Expand(path) if err != nil { return xerrors.Errorf("expanding local path: %w", err) } if err := m.localStore.OpenPath(ctx, path); err != nil { return xerrors.Errorf("opening local path: %w", err) } if err := m.ls.SetStorage(func(sc *stores.StorageConfig) { sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{Path: path}) }); err != nil { return xerrors.Errorf("get storage config: %w", err) } return nil } func (m *Manager) AddWorker(ctx context.Context, w Worker) error { return m.sched.runWorker(ctx, w) } func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) { m.remoteHnd.ServeHTTP(w, r) } func (m *Manager) SectorSize() abi.SectorSize { sz, _ := m.scfg.SealProofType.SectorSize() return sz } func schedNop(context.Context, Worker) error { return nil } func (m *Manager) schedFetch(sector abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error { return func(ctx context.Context, worker Worker) error { _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am)) return err } } func (m *Manager) readPiece(sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, rok *bool) func(ctx context.Context, w Worker) error { return func(ctx context.Context, w Worker) error { r, err := m.waitSimpleCall(ctx)(w.ReadPiece(ctx, sink, sector, offset, size)) if err != nil { return err } *rok = r.(bool) return nil } } func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (foundUnsealed bool, readOk bool, selector WorkerSelector, returnErr error) { // acquire a lock purely for reading unsealed sectors ctx, cancel := context.WithCancel(ctx) defer cancel() if err := m.index.StorageLock(ctx, sector, storiface.FTUnsealed, storiface.FTNone); err != nil { returnErr = xerrors.Errorf("acquiring read sector lock: %w", err) return } // passing 0 spt because we only need it when allowFetch is true best, err := m.index.StorageFindSector(ctx, sector, storiface.FTUnsealed, 0, false) if err != nil { returnErr = xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err) return } foundUnsealed = len(best) > 0 if foundUnsealed { // append to existing // There is unsealed sector, see if we can read from it selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false) err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), m.readPiece(sink, sector, offset, size, &readOk)) if err != nil { returnErr = xerrors.Errorf("reading piece from sealed sector: %w", err) } } else { selector = newAllocSelector(m.index, storiface.FTUnsealed, storiface.PathSealing) } return } func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error { foundUnsealed, readOk, selector, err := m.tryReadUnsealedPiece(ctx, sink, sector, offset, size) if err != nil { return err } if readOk { return nil } ctx, cancel := context.WithCancel(ctx) defer cancel() if err := m.index.StorageLock(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTUnsealed); err != nil { return xerrors.Errorf("acquiring unseal sector lock: %w", err) } unsealFetch := func(ctx context.Context, worker Worker) error { if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil { return xerrors.Errorf("copy sealed/cache sector data: %w", err) } if foundUnsealed { if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove)); err != nil { return xerrors.Errorf("copy unsealed sector data: %w", err) } } return nil } if unsealed == cid.Undef { return xerrors.Errorf("cannot unseal piece (sector: %d, offset: %d size: %d) - unsealed cid is undefined", sector, offset, size) } err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error { // TODO: make restartable _, err := m.waitSimpleCall(ctx)(w.UnsealPiece(ctx, sector, offset, size, ticket, unsealed)) return err }) if err != nil { return err } selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false) err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), m.readPiece(sink, sector, offset, size, &readOk)) if err != nil { return xerrors.Errorf("reading piece from sealed sector: %w", err) } if !readOk { return xerrors.Errorf("failed to read unsealed piece") } return nil } func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error { log.Warnf("stub NewSector") return nil } func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() if err := m.index.StorageLock(ctx, sector, storiface.FTNone, storiface.FTUnsealed); err != nil { return abi.PieceInfo{}, xerrors.Errorf("acquiring sector lock: %w", err) } var selector WorkerSelector var err error if len(existingPieces) == 0 { // new selector = newAllocSelector(m.index, storiface.FTUnsealed, storiface.PathSealing) } else { // use existing selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false) } var out abi.PieceInfo err = m.sched.Schedule(ctx, sector, sealtasks.TTAddPiece, selector, schedNop, func(ctx context.Context, w Worker) error { p, err := m.waitSimpleCall(ctx)(w.AddPiece(ctx, sector, existingPieces, sz, r)) if err != nil { return err } out = p.(abi.PieceInfo) return nil }) return out, err } func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { ctx, cancel := context.WithCancel(ctx) defer cancel() wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTPreCommit1, sector, ticket, pieces) if err != nil { return nil, xerrors.Errorf("getWork: %w", err) } defer cancel() var waitErr error waitRes := func() { p, werr := m.waitWork(ctx, wk) if werr != nil { waitErr = werr return } out = p.(storage.PreCommit1Out) } if wait { // already in progress waitRes() return out, waitErr } if err := m.index.StorageLock(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache); err != nil { return nil, xerrors.Errorf("acquiring sector lock: %w", err) } // TODO: also consider where the unsealed data sits selector := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathSealing) err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { err := m.startWork(ctx, w, wk)(w.SealPreCommit1(ctx, sector, ticket, pieces)) if err != nil { return err } waitRes() return nil }) if err != nil { return nil, err } return out, waitErr } func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error) { ctx, cancel := context.WithCancel(ctx) defer cancel() wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTPreCommit2, sector, phase1Out) if err != nil { return storage.SectorCids{}, xerrors.Errorf("getWork: %w", err) } defer cancel() var waitErr error waitRes := func() { p, werr := m.waitWork(ctx, wk) if werr != nil { waitErr = werr return } out = p.(storage.SectorCids) } if wait { // already in progress waitRes() return out, waitErr } if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil { return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err) } selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, true) err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { err := m.startWork(ctx, w, wk)(w.SealPreCommit2(ctx, sector, phase1Out)) if err != nil { return err } waitRes() return nil }) if err != nil { return storage.SectorCids{}, err } return out, waitErr } func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error) { ctx, cancel := context.WithCancel(ctx) defer cancel() wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTCommit1, sector, ticket, seed, pieces, cids) if err != nil { return storage.Commit1Out{}, xerrors.Errorf("getWork: %w", err) } defer cancel() var waitErr error waitRes := func() { p, werr := m.waitWork(ctx, wk) if werr != nil { waitErr = werr return } out = p.(storage.Commit1Out) } if wait { // already in progress waitRes() return out, waitErr } if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil { return storage.Commit1Out{}, xerrors.Errorf("acquiring sector lock: %w", err) } // NOTE: We set allowFetch to false in so that we always execute on a worker // with direct access to the data. We want to do that because this step is // generally very cheap / fast, and transferring data is not worth the effort selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false) err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { err := m.startWork(ctx, w, wk)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) if err != nil { return err } waitRes() return nil }) if err != nil { return nil, err } return out, waitErr } func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error) { wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTCommit2, sector, phase1Out) if err != nil { return storage.Proof{}, xerrors.Errorf("getWork: %w", err) } defer cancel() var waitErr error waitRes := func() { p, werr := m.waitWork(ctx, wk) if werr != nil { waitErr = werr return } out = p.(storage.Proof) } if wait { // already in progress waitRes() return out, waitErr } selector := newTaskSelector() err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error { err := m.startWork(ctx, w, wk)(w.SealCommit2(ctx, sector, phase1Out)) if err != nil { return err } waitRes() return nil }) if err != nil { return nil, err } return out, waitErr } func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error { ctx, cancel := context.WithCancel(ctx) defer cancel() if err := m.index.StorageLock(ctx, sector, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil { return xerrors.Errorf("acquiring sector lock: %w", err) } unsealed := storiface.FTUnsealed { unsealedStores, err := m.index.StorageFindSector(ctx, sector, storiface.FTUnsealed, 0, false) if err != nil { return xerrors.Errorf("finding unsealed sector: %w", err) } if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine unsealed = storiface.FTNone } } selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false) err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { _, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed)) return err }) if err != nil { return err } fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathStorage) moveUnsealed := unsealed { if len(keepUnsealed) == 0 { moveUnsealed = storiface.FTNone } } err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove), func(ctx context.Context, w Worker) error { _, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed)) return err }) if err != nil { return xerrors.Errorf("moving sector to storage: %w", err) } return nil } func (m *Manager) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error { log.Warnw("ReleaseUnsealed todo") return nil } func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error { ctx, cancel := context.WithCancel(ctx) defer cancel() if err := m.index.StorageLock(ctx, sector, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil { return xerrors.Errorf("acquiring sector lock: %w", err) } var err error if rerr := m.storage.Remove(ctx, sector, storiface.FTSealed, true); rerr != nil { err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr)) } if rerr := m.storage.Remove(ctx, sector, storiface.FTCache, true); rerr != nil { err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr)) } if rerr := m.storage.Remove(ctx, sector, storiface.FTUnsealed, true); rerr != nil { err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr)) } return err } func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error { return m.returnResult(callID, pi, err) } func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error { return m.returnResult(callID, p1o, err) } func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error { return m.returnResult(callID, sealed, err) } func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error { return m.returnResult(callID, out, err) } func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error { return m.returnResult(callID, proof, err) } func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err string) error { return m.returnResult(callID, nil, err) } func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err string) error { return m.returnResult(callID, nil, err) } func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err string) error { return m.returnResult(callID, nil, err) } func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err string) error { return m.returnResult(callID, nil, err) } func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err string) error { return m.returnResult(callID, ok, err) } func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err string) error { return m.returnResult(callID, nil, err) } func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) { l, err := m.localStore.Local(ctx) if err != nil { return nil, err } out := map[stores.ID]string{} for _, st := range l { out[st.ID] = st.LocalPath } return out, nil } func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error) { return m.storage.FsStat(ctx, id) } func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, error) { if doSched { select { case m.sched.workerChange <- struct{}{}: case <-ctx.Done(): return nil, ctx.Err() } } return m.sched.Info(ctx) } func (m *Manager) Close(ctx context.Context) error { return m.sched.Close(ctx) } var _ SectorManager = &Manager{}