From 73613ee88397adc9a0bae149458193fcf6af7cbc Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Tue, 18 May 2021 17:05:25 +0530 Subject: [PATCH] docs, logs and green ci --- cmd/lotus-storage-miner/init.go | 14 ++++++++-- extern/sector-storage/manager.go | 20 ++++++++------ extern/sector-storage/mock/mock.go | 10 ++++--- extern/sector-storage/piece_provider.go | 17 ++++++++++++ extern/sector-storage/stores/http_handler.go | 14 ++++++++++ extern/sector-storage/stores/remote.go | 28 +++++++++++++++++--- node/builder.go | 2 ++ node/modules/storageminer.go | 14 +++++++--- node/test/builder.go | 20 +++++++++++--- 9 files changed, 116 insertions(+), 23 deletions(-) diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index a02520116..b8c81408e 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net/http" "os" "path/filepath" "strconv" @@ -453,14 +454,23 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix)) smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix)) - smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), sectorstorage.SealerConfig{ + si := stores.NewIndex() + + lstor, err := stores.NewLocal(ctx, lr, si, nil) + if err != nil { + return err + } + stor := stores.NewRemote(lstor, si, http.Header(sa), 10) + + smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{ ParallelFetchLimit: 10, AllowAddPiece: true, AllowPreCommit1: true, AllowPreCommit2: true, AllowCommit: true, AllowUnseal: true, - }, nil, sa, wsts, smsts) + }, wsts, smsts) + if err != nil { return err } diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index c4026eb04..385f8175b 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -103,19 +103,13 @@ type StorageAuth http.Header type WorkerStateStore *statestore.StateStore type ManagerStateStore *statestore.StateStore -func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, urls URLs, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { - lstor, err := stores.NewLocal(ctx, ls, si, urls) - if err != nil { - return nil, err - } +func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si}) if err != nil { return nil, xerrors.Errorf("creating prover instance: %w", err) } - stor := stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit) - m := &Manager{ ls: ls, storage: stor, @@ -204,6 +198,10 @@ func (m *Manager) schedFetch(sector storage.SectorRef, ft storiface.SectorFileTy } } +// SectorsUnsealPiece will Unseal the Sealed sector file for the given sector. +// It will schedule the Unsealing task on a worker that either already has the sealed sector files or has space in +// one of it's sealing scratch spaces to store them after fetching them from another worker. +// If the chosen worker already has the Unsealed sector file, we will NOT Unseal the sealed sector file again. func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed *cid.Cid) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -213,6 +211,8 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorR return xerrors.Errorf("acquiring unseal sector lock: %w", err) } + // if the selected worker does NOT have the sealed files for the sector, instruct it to fetch it from a worker that has them and + // put it in the sealing scratch space. sealFetch := func(ctx context.Context, worker Worker) error { log.Debugf("copy sealed/cache sector data for sector %d", sector.ID) if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil { @@ -231,6 +231,8 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorR return xerrors.Errorf("getting sector size: %w", err) } + // selector will schedule the Unseal task on a worker that either already has the sealed sector files or has space in + // one of it's sealing scratch spaces to store them after fetching them from another worker. selector := newExistingSelector(m.index, sector.ID, storiface.FTSealed|storiface.FTCache, true) log.Debugf("schedule unseal for sector %d", sector.ID) @@ -241,12 +243,14 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorR // unseal the sector partially. Requesting the whole sector here can // save us some work in case another piece is requested from here log.Debugf("unseal sector %d", sector.ID) + + // Note: This unsealed call will essentially become a no-op of the worker already has an Unsealed sector file for the given sector. _, err := m.waitSimpleCall(ctx)(w.UnsealPiece(ctx, sector, 0, abi.PaddedPieceSize(ssize).Unpadded(), ticket, *unsealed)) log.Debugf("completed unseal sector %d", sector.ID) return err }) if err != nil { - return err + return xerrors.Errorf("worker UnsealPiece call: %s", err) } return nil diff --git a/extern/sector-storage/mock/mock.go b/extern/sector-storage/mock/mock.go index ae7d54985..d3e76e881 100644 --- a/extern/sector-storage/mock/mock.go +++ b/extern/sector-storage/mock/mock.go @@ -6,6 +6,7 @@ import ( "crypto/sha256" "fmt" "io" + "io/ioutil" "math/rand" "sync" @@ -372,13 +373,12 @@ func generateFakePoSt(sectorInfo []proof2.SectorInfo, rpt func(abi.RegisteredSea } } -func (mgr *SectorMgr) ReadPiece(ctx context.Context, w io.Writer, sectorID storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, c cid.Cid) error { +func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) { if offset != 0 { panic("implme") } - _, err := io.CopyN(w, bytes.NewReader(mgr.pieces[mgr.sectors[sectorID.ID].pieces[0]]), int64(size)) - return err + return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])), false, nil } func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) { @@ -489,6 +489,10 @@ func (mgr *SectorMgr) ReturnFetch(ctx context.Context, callID storiface.CallID, panic("not supported") } +func (mgr *SectorMgr) SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error { + return nil +} + func (m mockVerif) VerifySeal(svi proof2.SealVerifyInfo) (bool, error) { plen, err := svi.SealProof.ProofSize() if err != nil { diff --git a/extern/sector-storage/piece_provider.go b/extern/sector-storage/piece_provider.go index 747d4c5c8..f4e7439cd 100644 --- a/extern/sector-storage/piece_provider.go +++ b/extern/sector-storage/piece_provider.go @@ -17,10 +17,12 @@ import ( ) type Unsealer interface { + // SectorsUnsealPiece will Unseal a Sealed sector file for the given sector. SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error } type PieceProvider interface { + // ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) } @@ -38,6 +40,10 @@ func NewPieceProvider(storage *stores.Remote, index stores.SectorIndex, uns Unse } } +// tryReadUnsealedPiece will try to read the unsealed piece from an existing unsealed sector file for the given sector from any worker that has it. +// It will NOT try to schedule an Unseal of a sealed sector file for the read. +// +// Will return a nil reader if the piece does NOT exist in any unsealed file/there is not unsealed file for the given sector on any of the workers. func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) { // acquire a lock purely for reading unsealed sectors ctx, cancel := context.WithCancel(ctx) @@ -58,6 +64,9 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage return r, cancel, nil } +// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector +// If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read. +// Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it. func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) { if err := offset.Valid(); err != nil { return nil, false, xerrors.Errorf("offset is not valid: %w", err) @@ -68,6 +77,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, offset, size) if xerrors.Is(err, storiface.ErrSectorNotFound) { + log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size) err = nil } if err != nil { @@ -85,6 +95,8 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, return nil, false, xerrors.Errorf("unsealing piece: %w", err) } + log.Debugf("unsealed a sector file to read the piece, sector=%+v, offset=%d, size=%d", sector, offset, size) + r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, offset, size) if err != nil { return nil, true, xerrors.Errorf("read after unsealing: %w", err) @@ -92,6 +104,9 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, if r == nil { return nil, true, xerrors.Errorf("got no reader after unsealing piece") } + log.Debugf("got a reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size) + } else { + log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, offset=%d, size=%d", sector, offset, size) } upr, err := fr32.NewUnpadReader(r, size.Padded()) @@ -99,6 +114,8 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err) } + log.Debugf("returning reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size) + return &funcCloser{ Reader: bufio.NewReaderSize(upr, 127), close: func() error { diff --git a/extern/sector-storage/stores/http_handler.go b/extern/sector-storage/stores/http_handler.go index e11d853df..813943fac 100644 --- a/extern/sector-storage/stores/http_handler.go +++ b/extern/sector-storage/stores/http_handler.go @@ -59,6 +59,8 @@ func (handler *FetchHandler) remoteStatFs(w http.ResponseWriter, r *http.Request } } +// remoteGetSector returns the sector file/tared directory byte stream for the sectorID and sector file type sent in the request. +// returns an error if it does NOT have the required sector file/dir. func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) { log.Infof("SERVE GET %s", r.URL) vars := mux.Vars(r) @@ -129,8 +131,11 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ } } else { w.Header().Set("Content-Type", "application/octet-stream") + // will do a ranged read over the file at the given path if the caller has asked for a ranged read in the request headers. http.ServeFile(w, r, path) } + + log.Debugf("served sector file/dir, sectorID=%+v, fileType=%s, path=%s", id, ft, path) } func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.Request) { @@ -158,6 +163,9 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R } } +// remoteGetAllocated returns `http.StatusOK` if the worker already has an Unsealed sector file +// containing the Unsealed piece sent in the request. +// returns `http.StatusRequestedRangeNotSatisfiable` otherwise. func (handler *FetchHandler) remoteGetAllocated(w http.ResponseWriter, r *http.Request) { log.Infof("SERVE Alloc check %s", r.URL) vars := mux.Vars(r) @@ -216,6 +224,8 @@ func (handler *FetchHandler) remoteGetAllocated(w http.ResponseWriter, r *http.R ProofType: 0, } + // get the path of the local Unsealed file for the given sector. + // return error if we do NOT have it. paths, _, err := handler.Local.AcquireSector(r.Context(), si, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) if err != nil { log.Errorf("%+v", err) @@ -230,6 +240,7 @@ func (handler *FetchHandler) remoteGetAllocated(w http.ResponseWriter, r *http.R return } + // open the Unsealed file and check if it has the Unsealed sector for the piece at the given offset and size. pf, err := partialfile.OpenPartialFile(abi.PaddedPieceSize(ssize), path) if err != nil { log.Error("opening partial file: ", err) @@ -250,9 +261,12 @@ func (handler *FetchHandler) remoteGetAllocated(w http.ResponseWriter, r *http.R } if has { + log.Debugf("returning ok: worker has unsealed file with unsealed piece, sector:%+v, offset:%d, size:%d", id, offi, szi) w.WriteHeader(http.StatusOK) return } + + log.Debugf("returning StatusRequestedRangeNotSatisfiable: worker does NOT have unsealed file with unsealed piece, sector:%+v, offset:%d, size:%d", id, offi, szi) w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) } diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index b882eb052..2d409268b 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -479,10 +479,19 @@ func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.Pa return resp.Body, nil } -// Reader gets a reader for unsealed file range. Can return nil in case the requested range isn't allocated in the file +// Reader returns a reader for an unsealed piece at the given offset in the given sector. +// If the Miner has the unsealed piece locally, it will return a reader that reads from the local copy. +// If the Miner does NOT have the unsealed piece locally, it will query all workers that have the unsealed sector file +// to know if they have the unsealed piece and will then read the unsealed piece data from a worker that has it. +// +// Returns a nil reader if : +// 1. no worker(local worker included) has an unsealed file for the given sector OR +// 2. no worker(local worker included) has the unsealed piece in their unsealed sector file. +// Will return a nil reader and a nil error in such a case. func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) { ft := storiface.FTUnsealed + // check if we have the unsealed sector file locally paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) if err != nil { return nil, xerrors.Errorf("acquire local: %w", err) @@ -490,7 +499,11 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a path := storiface.PathByType(paths, ft) var rd io.ReadCloser + if path == "" { + // if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index + // to determine which workers have the unsealed file and then query those workers to know + // if they have the unsealed piece in the unsealed sector file. si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false) if err != nil { return nil, err @@ -500,7 +513,7 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound) } - // TODO Why are we sorting in ascending order here -> shouldn't we sort in descending order as higher weight means more preferred to store ? + // TODO Why are we sorting in ascending order here -> shouldn't we sort in descending order as higher weight means more likely to have the file ? sort.Slice(si, func(i, j int) bool { return si[i].Weight < si[j].Weight }) @@ -508,6 +521,8 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a iloop: for _, info := range si { for _, url := range info.URLs { + // checkAllocated makes a JSON RPC query to a remote worker to determine if it has + // unsealed piece in their unsealed sector file. ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size) if err != nil { log.Warnw("check if remote has piece", "url", url, "error", err) @@ -517,6 +532,8 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a continue } + // readRemote fetches a reader that we can used to read the unsealed piece from the remote worker. + // It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file. rd, err = r.readRemote(ctx, url, offset, size) if err != nil { log.Warnw("reading from remote", "url", url, "error", err) @@ -527,17 +544,22 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a } } } else { + // if we have the unsealed file locally, return a reader that can be used to read the contents of the + // unsealed piece. log.Infof("Read local %s (+%d,%d)", path, offset, size) ssize, err := s.ProofType.SectorSize() if err != nil { return nil, err } + // open the unsealed sector file for the given sector size located at the given path. pf, err := partialfile.OpenPartialFile(abi.PaddedPieceSize(ssize), path) if err != nil { return nil, xerrors.Errorf("opening partial file: %w", err) } + // even though we have an unsealed file for the given sector, we still need to determine if we have the unsealed piece + // in the unsealed sector file. That is what `HasAllocated` checks for. has, err := pf.HasAllocated(storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded()) if err != nil { return nil, xerrors.Errorf("has allocated: %w", err) @@ -547,10 +569,10 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a if err := pf.Close(); err != nil { return nil, xerrors.Errorf("close partial file: %w", err) } - return nil, nil } + log.Debugf("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size) return pf.Reader(storiface.PaddedByteIndex(offset), size) } diff --git a/node/builder.go b/node/builder.go index c22f0932e..588ca742d 100644 --- a/node/builder.go +++ b/node/builder.go @@ -375,6 +375,8 @@ var MinerNode = Options( Override(new(*stores.Index), stores.NewIndex), Override(new(stores.SectorIndex), From(new(*stores.Index))), Override(new(stores.LocalStorage), From(new(repo.LockedRepo))), + Override(new(*stores.Local), modules.LocalStorage), + Override(new(*stores.Remote), modules.RemoteStorage), Override(new(*sectorstorage.Manager), modules.SectorStorage), Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))), Override(new(storiface.WorkerReturn), From(new(sectorstorage.SectorManager))), diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 9a43c1d46..fbda07620 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -635,7 +635,6 @@ func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dt // RetrievalProvider creates a new retrieval provider attached to the provider blockstore func RetrievalProvider(h host.Host, miner *storage.Miner, - sealer sectorstorage.SectorManager, full v1api.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, @@ -660,13 +659,22 @@ func RetrievalProvider(h host.Host, var WorkerCallsPrefix = datastore.NewKey("/worker/calls") var ManagerWorkPrefix = datastore.NewKey("/stmgr/calls") -func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, urls sectorstorage.URLs, sa sectorstorage.StorageAuth, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) { +func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, urls sectorstorage.URLs) (*stores.Local, error) { + ctx := helpers.LifecycleCtx(mctx, lc) + return stores.NewLocal(ctx, ls, si, urls) +} + +func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.SealerConfig) *stores.Remote { + return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit) +} + +func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) { ctx := helpers.LifecycleCtx(mctx, lc) wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix)) smsts := statestore.New(namespace.Wrap(ds, ManagerWorkPrefix)) - sst, err := sectorstorage.New(ctx, ls, si, sc, urls, sa, wsts, smsts) + sst, err := sectorstorage.New(ctx, lstor, stor, ls, si, sc, wsts, smsts) if err != nil { return nil, err } diff --git a/node/test/builder.go b/node/test/builder.go index 7e26966a8..534a25b66 100644 --- a/node/test/builder.go +++ b/node/test/builder.go @@ -485,11 +485,16 @@ func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []tes } fulls[i].Stb = storageBuilder(fulls[i], mn, node.Options( - node.Override(new(sectorstorage.SectorManager), func() (sectorstorage.SectorManager, error) { + node.Override(new(*mock.SectorMgr), func() (*mock.SectorMgr, error) { return mock.NewMockSectorMgr(nil), nil }), - node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), + + node.Override(new(sectorstorage.SectorManager), node.From(new(*mock.SectorMgr))), + node.Override(new(sectorstorage.Unsealer), node.From(new(*mock.SectorMgr))), + node.Override(new(sectorstorage.PieceProvider), node.From(new(*mock.SectorMgr))), + node.Unset(new(*sectorstorage.Manager)), + node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), )) } @@ -523,11 +528,18 @@ func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []tes opts = node.Options() } storers[i] = CreateTestStorageNode(ctx, t, genms[i].Worker, maddrs[i], pidKeys[i], f, mn, node.Options( - node.Override(new(sectorstorage.SectorManager), func() (sectorstorage.SectorManager, error) { + node.Override(new(*mock.SectorMgr), func() (*mock.SectorMgr, error) { return mock.NewMockSectorMgr(sectors), nil }), - node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), + + node.Override(new(sectorstorage.SectorManager), node.From(new(*mock.SectorMgr))), + node.Override(new(sectorstorage.Unsealer), node.From(new(*mock.SectorMgr))), + node.Override(new(sectorstorage.PieceProvider), node.From(new(*mock.SectorMgr))), + node.Unset(new(*sectorstorage.Manager)), + + node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), + opts, ))