diff --git a/cmd/lotus-miner/init.go b/cmd/lotus-miner/init.go index c109e85b9..1b76960e9 100644 --- a/cmd/lotus-miner/init.go +++ b/cmd/lotus-miner/init.go @@ -463,7 +463,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix)) smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix)) - si := paths.NewIndex(nil) + si := paths.NewMemIndex(nil) lstor, err := paths.NewLocal(ctx, lr, si, nil) if err != nil { diff --git a/node/builder_miner.go b/node/builder_miner.go index dd35c6bec..99d5a201f 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -136,8 +136,15 @@ func ConfigStorageMiner(c interface{}) Option { If(cfg.Subsystems.EnableSectorStorage, // Sector storage - Override(new(*paths.IndexProxy), paths.NewIndexProxyHelper(cfg.Subsystems.EnableSectorIndexDB)), - Override(new(paths.SectorIndex), From(new(*paths.IndexProxy))), + If(cfg.Subsystems.EnableSectorIndexDB, + Override(new(*paths.DBIndex), paths.NewDBIndex), + Override(new(paths.SectorIndex), From(new(*paths.DBIndex))), + ), + If(!cfg.Subsystems.EnableSectorIndexDB, + Override(new(*paths.MemIndex), paths.NewMemIndex), + Override(new(paths.SectorIndex), From(new(*paths.MemIndex))), + ), + Override(new(*sectorstorage.Manager), modules.SectorStorage), Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))), Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))), diff --git a/storage/paths/index.go b/storage/paths/index.go index bc26bddb4..49ee11e09 100644 --- a/storage/paths/index.go +++ b/storage/paths/index.go @@ -61,7 +61,7 @@ type storageEntry struct { heartbeatErr error } -type Index struct { +type MemIndex struct { *indexLocks lk sync.RWMutex @@ -73,8 +73,8 @@ type Index struct { stores map[storiface.ID]*storageEntry } -func NewIndex(al *alerting.Alerting) *Index { - return &Index{ +func NewMemIndex(al *alerting.Alerting) *MemIndex { + return &MemIndex{ indexLocks: &indexLocks{ locks: map[abi.SectorID]*sectorLock{}, }, @@ -87,7 +87,7 @@ func NewIndex(al *alerting.Alerting) *Index { } } -func (i *Index) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { +func (i *MemIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -116,7 +116,7 @@ func (i *Index) StorageList(ctx context.Context) (map[storiface.ID][]storiface.D return out, nil } -func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { +func (i *MemIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) if _, hasAlert := i.pathAlerts[si.ID]; i.alerting != nil && !hasAlert { @@ -219,7 +219,7 @@ func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st return nil } -func (i *Index) StorageDetach(ctx context.Context, id storiface.ID, url string) error { +func (i *MemIndex) StorageDetach(ctx context.Context, id storiface.ID, url string) error { i.lk.Lock() defer i.lk.Unlock() @@ -307,7 +307,7 @@ func (i *Index) StorageDetach(ctx context.Context, id storiface.ID, url string) return nil } -func (i *Index) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { +func (i *MemIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { i.lk.Lock() defer i.lk.Unlock() @@ -350,7 +350,7 @@ func (i *Index) StorageReportHealth(ctx context.Context, id storiface.ID, report return nil } -func (i *Index) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { +func (i *MemIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { i.lk.Lock() defer i.lk.Unlock() @@ -382,7 +382,7 @@ loop: return nil } -func (i *Index) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { +func (i *MemIndex) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { i.lk.Lock() defer i.lk.Unlock() @@ -416,7 +416,7 @@ func (i *Index) StorageDropSector(ctx context.Context, storageID storiface.ID, s return nil } -func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { +func (i *MemIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -564,7 +564,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif return out, nil } -func (i *Index) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { +func (i *MemIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -576,7 +576,7 @@ func (i *Index) StorageInfo(ctx context.Context, id storiface.ID) (storiface.Sto return *si.info, nil } -func (i *Index) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { +func (i *MemIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -641,7 +641,7 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate storiface.SectorF return out, nil } -func (i *Index) FindSector(id abi.SectorID, typ storiface.SectorFileType) ([]storiface.ID, error) { +func (i *MemIndex) FindSector(id abi.SectorID, typ storiface.SectorFileType) ([]storiface.ID, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -660,4 +660,4 @@ func (i *Index) FindSector(id abi.SectorID, typ storiface.SectorFileType) ([]sto return out, nil } -var _ SectorIndex = &Index{} +var _ SectorIndex = &MemIndex{} diff --git a/storage/paths/index_proxy.go b/storage/paths/index_proxy.go deleted file mode 100644 index 06097b665..000000000 --- a/storage/paths/index_proxy.go +++ /dev/null @@ -1,118 +0,0 @@ -package paths - -import ( - "context" - - "github.com/filecoin-project/go-state-types/abi" - - "github.com/filecoin-project/lotus/journal/alerting" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" - "github.com/filecoin-project/lotus/storage/sealer/fsutil" - "github.com/filecoin-project/lotus/storage/sealer/storiface" -) - -type IndexProxy struct { - memIndex *Index - dbIndex *DBIndex - enableSectorIndexDB bool -} - -func NewIndexProxyHelper(enableSectorIndexDB bool) func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy { - return func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy { - return NewIndexProxy(al, db, enableSectorIndexDB) - } -} - -func NewIndexProxy(al *alerting.Alerting, db *harmonydb.DB, enableSectorIndexDB bool) *IndexProxy { - return &IndexProxy{ - memIndex: NewIndex(al), - dbIndex: NewDBIndex(al, db), - enableSectorIndexDB: enableSectorIndexDB, - } -} - -func (ip *IndexProxy) StorageAttach(ctx context.Context, info storiface.StorageInfo, stat fsutil.FsStat) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageAttach(ctx, info, stat) - } - return ip.memIndex.StorageAttach(ctx, info, stat) -} - -func (ip *IndexProxy) StorageDetach(ctx context.Context, id storiface.ID, url string) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageDetach(ctx, id, url) - } - return ip.memIndex.StorageDetach(ctx, id, url) -} - -func (ip *IndexProxy) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageInfo(ctx, id) - } - return ip.memIndex.StorageInfo(ctx, id) -} - -func (ip *IndexProxy) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageReportHealth(ctx, id, report) - } - return ip.memIndex.StorageReportHealth(ctx, id, report) -} - -func (ip *IndexProxy) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageDeclareSector(ctx, storageID, s, ft, primary) - } - return ip.memIndex.StorageDeclareSector(ctx, storageID, s, ft, primary) -} - -func (ip *IndexProxy) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageDropSector(ctx, storageID, s, ft) - } - return ip.memIndex.StorageDropSector(ctx, storageID, s, ft) -} - -func (ip *IndexProxy) StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch) - } - return ip.memIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch) -} - -func (ip *IndexProxy) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageBestAlloc(ctx, allocate, ssize, pathType) - } - return ip.memIndex.StorageBestAlloc(ctx, allocate, ssize, pathType) -} - -func (ip *IndexProxy) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageLock(ctx, sector, read, write) - } - return ip.memIndex.StorageLock(ctx, sector, read, write) -} - -func (ip *IndexProxy) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageTryLock(ctx, sector, read, write) - } - return ip.memIndex.StorageTryLock(ctx, sector, read, write) -} - -func (ip *IndexProxy) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageGetLocks(ctx) - } - return ip.memIndex.StorageGetLocks(ctx) -} - -func (ip *IndexProxy) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageList(ctx) - } - return ip.memIndex.StorageList(ctx) -} - -var _ SectorIndex = &IndexProxy{} diff --git a/storage/paths/index_test.go b/storage/paths/index_test.go index 9a241da23..96e17ce7d 100644 --- a/storage/paths/index_test.go +++ b/storage/paths/index_test.go @@ -42,7 +42,7 @@ const s32g = 32 << 30 func TestFindSimple(t *testing.T) { ctx := context.Background() - i := NewIndex(nil) + i := NewMemIndex(nil) stor1 := newTestStorage() stor2 := newTestStorage() @@ -79,7 +79,7 @@ func TestFindSimple(t *testing.T) { func TestFindNoAllow(t *testing.T) { ctx := context.Background() - i := NewIndex(nil) + i := NewMemIndex(nil) stor1 := newTestStorage() stor1.AllowTo = []storiface.Group{"grp1"} stor2 := newTestStorage() @@ -111,7 +111,7 @@ func TestFindNoAllow(t *testing.T) { func TestFindAllow(t *testing.T) { ctx := context.Background() - i := NewIndex(nil) + i := NewMemIndex(nil) stor1 := newTestStorage() stor1.AllowTo = []storiface.Group{"grp1"} diff --git a/storage/paths/local_test.go b/storage/paths/local_test.go index bfa138ff6..4bc2642dc 100644 --- a/storage/paths/local_test.go +++ b/storage/paths/local_test.go @@ -80,7 +80,7 @@ func TestLocalStorage(t *testing.T) { root: root, } - index := NewIndex(nil) + index := NewMemIndex(nil) st, err := NewLocal(ctx, tstor, index, nil) require.NoError(t, err) diff --git a/storage/paths/remote_test.go b/storage/paths/remote_test.go index e3376e6fa..7aea63729 100644 --- a/storage/paths/remote_test.go +++ b/storage/paths/remote_test.go @@ -59,7 +59,7 @@ func createTestStorage(t *testing.T, p string, seal bool, att ...*paths.Local) s func TestMoveShared(t *testing.T) { logging.SetAllLoggers(logging.LevelDebug) - index := paths.NewIndex(nil) + index := paths.NewMemIndex(nil) ctx := context.Background() diff --git a/storage/sealer/manager_test.go b/storage/sealer/manager_test.go index 7c3e1a1f2..d76424d5e 100644 --- a/storage/sealer/manager_test.go +++ b/storage/sealer/manager_test.go @@ -100,10 +100,10 @@ func (t *testStorage) Stat(path string) (fsutil.FsStat, error) { var _ paths.LocalStorage = &testStorage{} -func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Manager, *paths.Local, *paths.Remote, *paths.Index, func()) { +func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Manager, *paths.Local, *paths.Remote, *paths.MemIndex, func()) { st := newTestStorage(t) - si := paths.NewIndex(nil) + si := paths.NewMemIndex(nil) lstor, err := paths.NewLocal(ctx, st, si, nil) require.NoError(t, err) diff --git a/storage/sealer/piece_provider_test.go b/storage/sealer/piece_provider_test.go index de1e07a78..a8c243379 100644 --- a/storage/sealer/piece_provider_test.go +++ b/storage/sealer/piece_provider_test.go @@ -180,7 +180,7 @@ func TestReadPieceRemoteWorkers(t *testing.T) { type pieceProviderTestHarness struct { ctx context.Context - index *paths.Index + index *paths.MemIndex pp PieceProvider sector storiface.SectorRef mgr *Manager @@ -210,7 +210,7 @@ func newPieceProviderTestHarness(t *testing.T, mgrConfig config.SealerConfig, se require.NoError(t, err) // create index, storage, local store & remote store. - index := paths.NewIndex(nil) + index := paths.NewMemIndex(nil) storage := newTestStorage(t) localStore, err := paths.NewLocal(ctx, storage, index, []string{"http://" + nl.Addr().String() + "/remote"}) require.NoError(t, err) diff --git a/storage/sealer/sched_test.go b/storage/sealer/sched_test.go index 2e2b05ab2..03e947b8a 100644 --- a/storage/sealer/sched_test.go +++ b/storage/sealer/sched_test.go @@ -187,7 +187,7 @@ func (s *schedTestWorker) Close() error { var _ Worker = &schedTestWorker{} -func addTestWorker(t *testing.T, sched *Scheduler, index *paths.Index, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) { +func addTestWorker(t *testing.T, sched *Scheduler, index *paths.MemIndex, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) { w := &schedTestWorker{ name: name, taskTypes: taskTypes, @@ -231,7 +231,7 @@ func TestSchedStartStop(t *testing.T) { require.NoError(t, err) go sched.runSched() - addTestWorker(t, sched, paths.NewIndex(nil), "fred", nil, decentWorkerResources, false) + addTestWorker(t, sched, paths.NewMemIndex(nil), "fred", nil, decentWorkerResources, false) require.NoError(t, sched.Close(context.TODO())) } @@ -264,13 +264,13 @@ func TestSched(t *testing.T) { wg sync.WaitGroup } - type task func(*testing.T, *Scheduler, *paths.Index, *runMeta) + type task func(*testing.T, *Scheduler, *paths.MemIndex, *runMeta) sched := func(taskName, expectWorker string, sid abi.SectorNumber, taskType sealtasks.TaskType) task { _, _, l, _ := runtime.Caller(1) _, _, l2, _ := runtime.Caller(2) - return func(t *testing.T, sched *Scheduler, index *paths.Index, rm *runMeta) { + return func(t *testing.T, sched *Scheduler, index *paths.MemIndex, rm *runMeta) { done := make(chan struct{}) rm.done[taskName] = done @@ -324,7 +324,7 @@ func TestSched(t *testing.T) { taskStarted := func(name string) task { _, _, l, _ := runtime.Caller(1) _, _, l2, _ := runtime.Caller(2) - return func(t *testing.T, sched *Scheduler, index *paths.Index, rm *runMeta) { + return func(t *testing.T, sched *Scheduler, index *paths.MemIndex, rm *runMeta) { select { case rm.done[name] <- struct{}{}: case <-ctx.Done(): @@ -336,7 +336,7 @@ func TestSched(t *testing.T) { taskDone := func(name string) task { _, _, l, _ := runtime.Caller(1) _, _, l2, _ := runtime.Caller(2) - return func(t *testing.T, sched *Scheduler, index *paths.Index, rm *runMeta) { + return func(t *testing.T, sched *Scheduler, index *paths.MemIndex, rm *runMeta) { select { case rm.done[name] <- struct{}{}: case <-ctx.Done(): @@ -349,7 +349,7 @@ func TestSched(t *testing.T) { taskNotScheduled := func(name string) task { _, _, l, _ := runtime.Caller(1) _, _, l2, _ := runtime.Caller(2) - return func(t *testing.T, sched *Scheduler, index *paths.Index, rm *runMeta) { + return func(t *testing.T, sched *Scheduler, index *paths.MemIndex, rm *runMeta) { select { case rm.done[name] <- struct{}{}: t.Fatal("not expected", l, l2) @@ -360,7 +360,7 @@ func TestSched(t *testing.T) { testFunc := func(workers []workerSpec, tasks []task) func(t *testing.T) { return func(t *testing.T) { - index := paths.NewIndex(nil) + index := paths.NewMemIndex(nil) sched, err := newScheduler(ctx, "") require.NoError(t, err) @@ -389,7 +389,7 @@ func TestSched(t *testing.T) { } multTask := func(tasks ...task) task { - return func(t *testing.T, s *Scheduler, index *paths.Index, meta *runMeta) { + return func(t *testing.T, s *Scheduler, index *paths.MemIndex, meta *runMeta) { for _, tsk := range tasks { tsk(t, s, index, meta) } @@ -503,7 +503,7 @@ func TestSched(t *testing.T) { } diag := func() task { - return func(t *testing.T, s *Scheduler, index *paths.Index, meta *runMeta) { + return func(t *testing.T, s *Scheduler, index *paths.MemIndex, meta *runMeta) { time.Sleep(20 * time.Millisecond) for _, request := range s.diag().Requests { log.Infof("!!! sDIAG: sid(%d) task(%s)", request.Sector.Number, request.TaskType)