diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index c3e867da3..e9845e29c 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -167,7 +167,7 @@ var runCmd = &cli.Command{ remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader()) workerApi := &worker{ - LocalWorker: advmgr.NewLocalWorker(act, spt, remote, localStore), + LocalWorker: advmgr.NewLocalWorker(act, spt, remote, localStore, stores.NewIndex()), } mux := mux.NewRouter() diff --git a/node/builder.go b/node/builder.go index 661edcccb..7ec2b3943 100644 --- a/node/builder.go +++ b/node/builder.go @@ -258,6 +258,7 @@ func Online() Option { // Storage miner ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner }, + Override(new(*stores.Index), stores.NewIndex()), Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig), Override(new(stores.LocalStorage), From(new(repo.LockedRepo))), Override(new(advmgr.SectorIDCounter), modules.SectorIDCounter), diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 4082ff45e..ac4e3a282 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -39,6 +39,7 @@ type StorageMinerAPI struct { BlockMiner *miner.Miner Full api.FullNode StorageMgr *advmgr.Manager `optional:"true"` + *stores.Index } func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { @@ -152,18 +153,6 @@ func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error panic("todo register ") } -func (sm *StorageMinerAPI) StorageAttach(ctx context.Context, si stores.StorageInfo) error { - panic("implement me") -} - -func (sm *StorageMinerAPI) StorageDeclareSector(ctx context.Context, storageId stores.ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error { - panic("implement me") -} - -func (sm *StorageMinerAPI) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) { - panic("implement me") -} - func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error { fi, err := os.Open(path) if err != nil { diff --git a/storage/sealmgr/advmgr/local.go b/storage/sealmgr/advmgr/local.go index 59948058f..7d6b5f779 100644 --- a/storage/sealmgr/advmgr/local.go +++ b/storage/sealmgr/advmgr/local.go @@ -8,22 +8,25 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" storage2 "github.com/filecoin-project/specs-storage/storage" - "github.com/filecoin-project/go-sectorbuilder" - "github.com/filecoin-project/lotus/storage/sealmgr/stores" - "github.com/filecoin-project/lotus/storage/sealmgr" + "github.com/filecoin-project/lotus/storage/sealmgr/sectorutil" + "github.com/filecoin-project/lotus/storage/sealmgr/stores" ) +var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache} + type LocalWorker struct { scfg *sectorbuilder.Config storage stores.Store localStore *stores.Local + sindex stores.SectorIndex } -func NewLocalWorker(ma address.Address, spt abi.RegisteredProof, store stores.Store, local *stores.Local) *LocalWorker { +func NewLocalWorker(ma address.Address, spt abi.RegisteredProof, store stores.Store, local *stores.Local, sindex stores.SectorIndex) *LocalWorker { ppt, err := spt.RegisteredPoStProof() if err != nil { panic(err) @@ -36,6 +39,7 @@ func NewLocalWorker(ma address.Address, spt abi.RegisteredProof, store stores.St }, storage: store, localStore: local, + sindex: sindex, } } @@ -49,10 +53,31 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, id abi.Sect return sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("get miner ID: %w", err) } - return l.w.storage.AcquireSector(ctx, abi.SectorID{ + sector := abi.SectorID{ Miner: abi.ActorID(mid), Number: id, - }, existing, allocate, sealing) + } + + paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing) + if err != nil { + return sectorbuilder.SectorPaths{}, nil, err + } + + return paths, func() { + done() + + for _, fileType := range pathTypes { + if fileType&allocate == 0 { + continue + } + + sid := sectorutil.PathByType(storageIDs, fileType) + + if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType); err != nil { + log.Errorf("declare sector error: %+v", err) + } + } + }, nil } func (l *LocalWorker) sb() (sectorbuilder.Basic, error) { diff --git a/storage/sealmgr/advmgr/roprov.go b/storage/sealmgr/advmgr/roprov.go index 4a8fa8261..ce40d2ad3 100644 --- a/storage/sealmgr/advmgr/roprov.go +++ b/storage/sealmgr/advmgr/roprov.go @@ -20,8 +20,10 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorNumbe return sectorbuilder.SectorPaths{}, nil, xerrors.New("read-only storage") } - return l.stor.AcquireSector(ctx, abi.SectorID{ + p, _, done, err := l.stor.AcquireSector(ctx, abi.SectorID{ Miner: l.miner, Number: id, }, existing, allocate, sealing) + + return p, done, err } diff --git a/storage/sealmgr/stores/http_handler.go b/storage/sealmgr/stores/http_handler.go index 75cb080b5..f1fa42c8e 100644 --- a/storage/sealmgr/stores/http_handler.go +++ b/storage/sealmgr/stores/http_handler.go @@ -44,7 +44,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ if err != nil { return } - paths, done, err := handler.Store.AcquireSector(r.Context(), id, ft, 0, false) + paths, _, done, err := handler.Store.AcquireSector(r.Context(), id, ft, 0, false) if err != nil { return } diff --git a/storage/sealmgr/stores/index.go b/storage/sealmgr/stores/index.go index 37118764e..68fdccfcb 100644 --- a/storage/sealmgr/stores/index.go +++ b/storage/sealmgr/stores/index.go @@ -75,16 +75,23 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se i.lk.Lock() defer i.lk.Unlock() - d := decl{s, ft} - - for _, sid := range i.sectors[d] { - if sid == storageId { - log.Warnf("sector %v redeclared in %s", storageId) - return nil + for _, fileType := range pathTypes { + if fileType&ft == 0 { + continue } + + d := decl{s, fileType} + + for _, sid := range i.sectors[d] { + if sid == storageId { + log.Warnf("sector %v redeclared in %s", storageId) + return nil + } + } + + i.sectors[d] = append(i.sectors[d], storageId) } - i.sectors[d] = append(i.sectors[d], storageId) return nil } diff --git a/storage/sealmgr/stores/interface.go b/storage/sealmgr/stores/interface.go index a6ea5ec99..6e855307d 100644 --- a/storage/sealmgr/stores/interface.go +++ b/storage/sealmgr/stores/interface.go @@ -8,5 +8,5 @@ import ( ) type Store interface { - AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) + AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error) } diff --git a/storage/sealmgr/stores/local.go b/storage/sealmgr/stores/local.go index e5f3f9548..26e1896a1 100644 --- a/storage/sealmgr/stores/local.go +++ b/storage/sealmgr/stores/local.go @@ -133,12 +133,7 @@ func (st *Local) open() error { return nil } -func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { - out, _, done, err := st.acquireSector(ctx, sid, existing, allocate, sealing) - return out, done, err -} - -func (st *Local) acquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) { +func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) { if existing|allocate != existing^allocate { return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") } diff --git a/storage/sealmgr/stores/remote.go b/storage/sealmgr/stores/remote.go index b0b19a884..04272b450 100644 --- a/storage/sealmgr/stores/remote.go +++ b/storage/sealmgr/stores/remote.go @@ -37,17 +37,17 @@ func NewRemote(local *Local, remote SectorIndex, auth http.Header) *Remote { } } -func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { +func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) { if existing|allocate != existing^allocate { - return sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") + return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") } r.fetchLk.Lock() defer r.fetchLk.Unlock() - paths, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing) + paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing) if err != nil { - return sectorbuilder.SectorPaths{}, nil, err + return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, err } for _, fileType := range pathTypes { @@ -62,18 +62,19 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec ap, storageID, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing) if err != nil { done() - return sectorbuilder.SectorPaths{}, nil, err + return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, err } done = mergeDone(done, rdone) sectorutil.SetPathByType(&paths, fileType, ap) + sectorutil.SetPathByType(&stores, fileType, string(storageID)) if err := r.remote.StorageDeclareSector(ctx, storageID, s, fileType); err != nil { log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err) } } - return paths, done, nil + return paths, stores, done, nil } func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, func(), error) { @@ -86,7 +87,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType return si[i].Cost < si[j].Cost }) - apaths, ids, done, err := r.local.acquireSector(ctx, s, 0, fileType, sealing) + apaths, ids, done, err := r.local.AcquireSector(ctx, s, 0, fileType, sealing) if err != nil { return "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err) }