From 2aad7b69796e2b6ee4e1ad089c95ee3b1493328a Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Thu, 20 May 2021 12:32:29 +0200 Subject: [PATCH] update sectorstorage.New (Manager) interface --- cmd/lotus-storage-miner/init.go | 12 ++++++++++-- extern/sector-storage/manager.go | 9 +-------- extern/sector-storage/stores/local.go | 2 ++ node/modules/storageminer.go | 13 +++++++++++-- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 76132a7a2..71098e725 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -453,14 +453,22 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix)) smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix)) - smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), sectorstorage.SealerConfig{ + si := stores.NewIndex() + + lstor, err := stores.NewLocal(ctx, lr, si, nil) + if err != nil { + return err + } + stor := stores.NewRemote(lstor, si, nil, 10) + + smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{ ParallelFetchLimit: 10, AllowAddPiece: true, AllowPreCommit1: true, AllowPreCommit2: true, AllowCommit: true, AllowUnseal: true, - }, nil, sa, wsts, smsts) + }, sa, wsts, smsts) if err != nil { return err } diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 70d55ee2e..7eb6df648 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -105,19 +105,12 @@ type StorageAuth http.Header type WorkerStateStore *statestore.StateStore type ManagerStateStore *statestore.StateStore -func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, urls URLs, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { - lstor, err := stores.NewLocal(ctx, ls, si, urls) - if err != nil { - return nil, err - } - +func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si}) if err != nil { return nil, xerrors.Errorf("creating prover instance: %w", err) } - stor := stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit) - m := &Manager{ ls: ls, storage: stor, diff --git a/extern/sector-storage/stores/local.go b/extern/sector-storage/stores/local.go index 5a10b21b9..cac160139 100644 --- a/extern/sector-storage/stores/local.go +++ b/extern/sector-storage/stores/local.go @@ -158,6 +158,8 @@ func (p *path) sectorPath(sid abi.SectorID, fileType storiface.SectorFileType) s return filepath.Join(p.local, fileType.String(), storiface.SectorName(sid)) } +type URLs []string + func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) { l := &Local{ localStorage: ls, diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index be949255f..2007470d4 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -661,13 +661,22 @@ func RetrievalProvider(h host.Host, var WorkerCallsPrefix = datastore.NewKey("/worker/calls") var ManagerWorkPrefix = datastore.NewKey("/stmgr/calls") -func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, urls sectorstorage.URLs, sa sectorstorage.StorageAuth, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) { +func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, urls stores.URLs) (*stores.Local, error) { + ctx := helpers.LifecycleCtx(mctx, lc) + return stores.NewLocal(ctx, ls, si, urls) +} + +func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.SealerConfig) *stores.Remote { + return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit) +} + +func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, sa sectorstorage.StorageAuth, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) { ctx := helpers.LifecycleCtx(mctx, lc) wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix)) smsts := statestore.New(namespace.Wrap(ds, ManagerWorkPrefix)) - sst, err := sectorstorage.New(ctx, ls, si, sc, urls, sa, wsts, smsts) + sst, err := sectorstorage.New(ctx, lstor, stor, ls, si, sc, sa, wsts, smsts) if err != nil { return nil, err }