From 2c7f57983811b3612f00b98f26ede1e5b70f3698 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 13 Mar 2020 12:59:19 +0100 Subject: [PATCH] workers: Implement SectorIndex --- api/api_storage.go | 27 +----- api/api_worker.go | 3 +- api/apistruct/struct.go | 21 ++--- cmd/lotus-seed/seed/seed.go | 6 +- cmd/lotus-storage-miner/init.go | 5 +- cmd/lotus-storage-miner/storage.go | 6 +- node/config/storage.go | 9 -- node/impl/storminer.go | 7 +- node/repo/fsrepo.go | 2 +- node/repo/memrepo.go | 5 +- storage/sealmgr/advmgr/local.go | 11 ++- storage/sealmgr/advmgr/manager.go | 19 ++--- storage/sealmgr/advmgr/remote.go | 2 +- storage/sealmgr/advmgr/roprov.go | 2 +- storage/sealmgr/stores/index.go | 128 +++++++++++++++++++++++++++++ storage/sealmgr/stores/local.go | 45 +++++++--- storage/sealmgr/stores/remote.go | 23 ++---- 17 files changed, 217 insertions(+), 104 deletions(-) create mode 100644 storage/sealmgr/stores/index.go diff --git a/api/api_storage.go b/api/api_storage.go index 13aec9544..94cb1fdfa 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -7,11 +7,11 @@ import ( "github.com/ipfs/go-cid" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storage/sealmgr/stores" ) // alias because cbor-gen doesn't like non-alias types @@ -110,9 +110,7 @@ type StorageMiner interface { // WorkerConnect tells the node to connect to workers RPC WorkerConnect(context.Context, string) error - WorkerAttachStorage(context.Context, StorageInfo) error - StorageDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error - StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error) + stores.SectorIndex MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error) @@ -125,25 +123,6 @@ type StorageMiner interface { StorageAddLocal(ctx context.Context, path string) error } -type StorageInfo struct { - ID string - URLs []string // TODO: Support non-http transports - Cost int - - CanSeal bool - CanStore bool -} - -type StoragePath struct { - ID string - Weight uint64 - - LocalPath string - - CanSeal bool - CanStore bool -} - type SealRes struct { Err string GoErr error `json:"-"` diff --git a/api/api_worker.go b/api/api_worker.go index 67a22f77a..bafd6ca83 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -2,6 +2,7 @@ package api import ( "context" + "github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/specs-storage/storage" @@ -14,7 +15,7 @@ type WorkerApi interface { // TODO: Info() (name, ...) ? TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) // TaskType -> Weight - Paths(context.Context) ([]StoragePath, error) + Paths(context.Context) ([]stores.StoragePath, error) storage.Sealer } diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 1713f8b2d..31d9cfc71 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -2,6 +2,7 @@ package apistruct import ( "context" + "github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/specs-storage/storage" "github.com/ipfs/go-cid" @@ -180,10 +181,10 @@ type StorageMinerStruct struct { SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"` SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"` - WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm - WorkerAttachStorage func(context.Context, api.StorageInfo) error `perm:"admin"` - StorageDeclareSector func(ctx context.Context, storageId string, s abi.SectorID) error `perm:"admin"` - StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) `perm:"admin"` + WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm + WorkerAttachStorage func(context.Context, stores.StorageInfo) error `perm:"admin"` + StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"` + StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) `perm:"admin"` DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"` DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` @@ -199,7 +200,7 @@ type WorkerStruct struct { Version func(context.Context) (build.Version, error) `perm:"admin"` TaskTypes func(context.Context) (map[sealmgr.TaskType]struct{}, error) `perm:"admin"` - Paths func(context.Context) ([]api.StoragePath, error) `perm:"admin"` + Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"` SealPreCommit1 func(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"` SealPreCommit2 func(context.Context, abi.SectorNumber, storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) `perm:"admin"` @@ -651,15 +652,15 @@ func (c *StorageMinerStruct) WorkerConnect(ctx context.Context, url string) erro return c.Internal.WorkerConnect(ctx, url) } -func (c *StorageMinerStruct) WorkerAttachStorage(ctx context.Context, si api.StorageInfo) error { +func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo) error { return c.Internal.WorkerAttachStorage(ctx, si) } -func (c *StorageMinerStruct) StorageDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error { - return c.Internal.StorageDeclareSector(ctx, storageId, s) +func (c *StorageMinerStruct) StorageDeclareSector(ctx context.Context, storageId stores.ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error { + return c.Internal.StorageDeclareSector(ctx, storageId, s, ft) } -func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) { +func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) { return c.Internal.StorageFindSector(ctx, si, types) } @@ -699,7 +700,7 @@ func (w *WorkerStruct) TaskTypes(ctx context.Context) (map[sealmgr.TaskType]stru return w.Internal.TaskTypes(ctx) } -func (w *WorkerStruct) Paths(ctx context.Context) ([]api.StoragePath, error) { +func (w *WorkerStruct) Paths(ctx context.Context) ([]stores.StoragePath, error) { return w.Internal.Paths(ctx) } diff --git a/cmd/lotus-seed/seed/seed.go b/cmd/lotus-seed/seed/seed.go index 3f5946ccc..f18a9433f 100644 --- a/cmd/lotus-seed/seed/seed.go +++ b/cmd/lotus-seed/seed/seed.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "github.com/filecoin-project/lotus/storage/sealmgr/stores" "io/ioutil" "os" "path/filepath" @@ -26,7 +27,6 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/genesis" - "github.com/filecoin-project/lotus/node/config" ) var log = logging.Logger("preseal") @@ -134,8 +134,8 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum } { - b, err := json.MarshalIndent(&config.StorageMeta{ - ID: uuid.New().String(), + b, err := json.MarshalIndent(&stores.StorageMeta{ + ID: stores.ID(uuid.New().String()), Weight: 0, // read-only CanSeal: false, CanStore: false, diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index a8034ebfd..990aee236 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -7,6 +7,7 @@ import ( "encoding/binary" "encoding/json" "fmt" + "github.com/filecoin-project/lotus/storage/sealmgr/stores" "io/ioutil" "os" "path/filepath" @@ -187,8 +188,8 @@ var initCmd = &cli.Command{ } if !cctx.Bool("no-local-storage") { - b, err := json.MarshalIndent(&config.StorageMeta{ - ID: uuid.New().String(), + b, err := json.MarshalIndent(&stores.StorageMeta{ + ID: stores.ID(uuid.New().String()), Weight: 10, CanSeal: true, CanStore: true, diff --git a/cmd/lotus-storage-miner/storage.go b/cmd/lotus-storage-miner/storage.go index 7976e6dd5..31ab45c7c 100644 --- a/cmd/lotus-storage-miner/storage.go +++ b/cmd/lotus-storage-miner/storage.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "github.com/filecoin-project/lotus/storage/sealmgr/stores" "io/ioutil" "os" "path/filepath" @@ -12,7 +13,6 @@ import ( "gopkg.in/urfave/cli.v2" lcli "github.com/filecoin-project/lotus/cli" - "github.com/filecoin-project/lotus/node/config" ) const metaFile = "sectorstore.json" @@ -79,8 +79,8 @@ var storageAttachCmd = &cli.Command{ return err } - cfg := &config.StorageMeta{ - ID: uuid.New().String(), + cfg := &stores.StorageMeta{ + ID: stores.ID(uuid.New().String()), Weight: cctx.Uint64("weight"), CanSeal: cctx.Bool("seal"), CanStore: cctx.Bool("store"), diff --git a/node/config/storage.go b/node/config/storage.go index 3e6c9d2e2..834447c3a 100644 --- a/node/config/storage.go +++ b/node/config/storage.go @@ -18,15 +18,6 @@ type StorageConfig struct { StoragePaths []LocalPath } -// [path]/metadata.json -type StorageMeta struct { - ID string - Weight uint64 // 0 = readonly - - CanSeal bool - CanStore bool -} - func StorageFromFile(path string, def *StorageConfig) (*StorageConfig, error) { file, err := os.Open(path) switch { diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 9bbc58f1e..4082ff45e 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -3,6 +3,7 @@ package impl import ( "context" "encoding/json" + "github.com/filecoin-project/lotus/storage/sealmgr/stores" "net/http" "os" "strconv" @@ -151,15 +152,15 @@ func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error panic("todo register ") } -func (sm *StorageMinerAPI) WorkerAttachStorage(ctx context.Context, si api.StorageInfo) error { +func (sm *StorageMinerAPI) StorageAttach(ctx context.Context, si stores.StorageInfo) error { panic("implement me") } -func (sm *StorageMinerAPI) WorkerDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error { +func (sm *StorageMinerAPI) StorageDeclareSector(ctx context.Context, storageId stores.ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error { panic("implement me") } -func (sm *StorageMinerAPI) FindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) { +func (sm *StorageMinerAPI) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) { panic("implement me") } diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index 926541359..b5798c754 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -50,7 +50,7 @@ func defConfForType(t RepoType) interface{} { case StorageMiner: return config.DefaultStorageMiner() case Worker: - return &struct {}{} + return &struct{}{} default: panic(fmt.Sprintf("unknown RepoType(%d)", int(t))) } diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index 4dd7dc62e..016b0f21e 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -2,6 +2,7 @@ package repo import ( "encoding/json" + "github.com/filecoin-project/lotus/storage/sealmgr/stores" "io/ioutil" "os" "path/filepath" @@ -81,8 +82,8 @@ func (lmem *lockedMemRepo) Path() string { panic(err) } - b, err := json.MarshalIndent(&config.StorageMeta{ - ID: uuid.New().String(), + b, err := json.MarshalIndent(&stores.StorageMeta{ + ID: stores.ID(uuid.New().String()), Weight: 10, CanSeal: true, CanStore: true, diff --git a/storage/sealmgr/advmgr/local.go b/storage/sealmgr/advmgr/local.go index 1b5d9859a..59948058f 100644 --- a/storage/sealmgr/advmgr/local.go +++ b/storage/sealmgr/advmgr/local.go @@ -12,15 +12,14 @@ import ( storage2 "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/go-sectorbuilder" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/lotus/storage/sealmgr" ) type LocalWorker struct { - scfg *sectorbuilder.Config - storage stores.Store + scfg *sectorbuilder.Config + storage stores.Store localStore *stores.Local } @@ -30,7 +29,7 @@ func NewLocalWorker(ma address.Address, spt abi.RegisteredProof, store stores.St panic(err) } return &LocalWorker{ - scfg: §orbuilder.Config{ + scfg: §orbuilder.Config{ SealProofType: spt, PoStProofType: ppt, Miner: ma, @@ -51,7 +50,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, id abi.Sect } return l.w.storage.AcquireSector(ctx, abi.SectorID{ - Miner: abi.ActorID(mid), + Miner: abi.ActorID(mid), Number: id, }, existing, allocate, sealing) } @@ -123,7 +122,7 @@ func (l *LocalWorker) TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, }, nil } -func (l *LocalWorker) Paths(context.Context) ([]api.StoragePath, error) { +func (l *LocalWorker) Paths(context.Context) ([]stores.StoragePath, error) { return l.localStore.Local(), nil } diff --git a/storage/sealmgr/advmgr/manager.go b/storage/sealmgr/advmgr/manager.go index 83d580c7c..f7360a3c7 100644 --- a/storage/sealmgr/advmgr/manager.go +++ b/storage/sealmgr/advmgr/manager.go @@ -13,7 +13,6 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-sectorbuilder" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/specs-actors/actors/abi" @@ -33,7 +32,7 @@ type Worker interface { sectorbuilder.Sealer TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) - Paths(context.Context) ([]api.StoragePath, error) + Paths(context.Context) ([]stores.StoragePath, error) } type Manager struct { @@ -41,8 +40,8 @@ type Manager struct { scfg *sectorbuilder.Config sc SectorIDCounter - ls stores.LocalStorage - storage *stores.Local + ls stores.LocalStorage + storage *stores.Local remoteHnd *stores.FetchHandler storage2.Prover @@ -71,8 +70,8 @@ func New(ls stores.LocalStorage, cfg *sectorbuilder.Config, sc SectorIDCounter) scfg: cfg, sc: sc, - ls: ls, - storage: stor, + ls: ls, + storage: stor, remoteHnd: &stores.FetchHandler{Store: stor}, Prover: prover, @@ -116,9 +115,9 @@ func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorNumber, s panic("implement me") } -func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []config.StorageMeta) ([]Worker, map[int]config.StorageMeta) { +func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.StorageMeta) ([]Worker, map[int]stores.StorageMeta) { var workers []Worker - paths := map[int]config.StorageMeta{} + paths := map[int]stores.StorageMeta{} for i, worker := range m.workers { tt, err := worker.TaskTypes(context.TODO()) @@ -137,7 +136,7 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []config.Stor } // check if the worker has access to the path we selected - var st *config.StorageMeta + var st *stores.StorageMeta for _, p := range phs { for _, meta := range inPaths { if p.ID == meta.ID { @@ -164,7 +163,7 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []config.Stor func (m *Manager) AddPiece(ctx context.Context, sn abi.SectorNumber, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { // TODO: consider multiple paths vs workers when initially allocating - var best []config.StorageMeta + var best []stores.StorageMeta var err error if len(existingPieces) == 0 { // new best, err = m.storage.FindBestAllocStorage(sectorbuilder.FTUnsealed, true) diff --git a/storage/sealmgr/advmgr/remote.go b/storage/sealmgr/advmgr/remote.go index f1eab1891..066f6af1a 100644 --- a/storage/sealmgr/advmgr/remote.go +++ b/storage/sealmgr/advmgr/remote.go @@ -17,7 +17,7 @@ type remote struct { } func (r *remote) AddPiece(ctx context.Context, sector abi.SectorNumber, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) { - return abi.PieceInfo{},xerrors.New("unsupported") + return abi.PieceInfo{}, xerrors.New("unsupported") } func ConnectRemote(ctx context.Context, fa api.FullNode, url string) (*remote, error) { diff --git a/storage/sealmgr/advmgr/roprov.go b/storage/sealmgr/advmgr/roprov.go index 2269a3257..4a8fa8261 100644 --- a/storage/sealmgr/advmgr/roprov.go +++ b/storage/sealmgr/advmgr/roprov.go @@ -21,7 +21,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorNumbe } return l.stor.AcquireSector(ctx, abi.SectorID{ - Miner: l.miner, + Miner: l.miner, Number: id, }, existing, allocate, sealing) } diff --git a/storage/sealmgr/stores/index.go b/storage/sealmgr/stores/index.go new file mode 100644 index 000000000..37118764e --- /dev/null +++ b/storage/sealmgr/stores/index.go @@ -0,0 +1,128 @@ +package stores + +import ( + "context" + "net/url" + gopath "path" + "sync" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/lotus/storage/sealmgr/sectorutil" +) + +// ID identifies sector storage by UUID. One sector storage should map to one +// filesystem, local or networked / shared by multiple machines +type ID string + +type StorageInfo struct { + ID ID + URLs []string // TODO: Support non-http transports + Cost int + + CanSeal bool + CanStore bool +} + +type SectorIndex interface { // part of storage-miner api + StorageAttach(context.Context, StorageInfo) error + + StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error + StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error) +} + +type decl struct { + abi.SectorID + sectorbuilder.SectorFileType +} + +type Index struct { + lk sync.Mutex + + sectors map[decl][]ID + stores map[ID]*StorageInfo +} + +func NewIndex() *Index { + return &Index{ + sectors: map[decl][]ID{}, + stores: map[ID]*StorageInfo{}, + } +} + +func (i *Index) StorageAttach(ctx context.Context, si StorageInfo) error { + i.lk.Lock() + defer i.lk.Unlock() + + if _, ok := i.stores[si.ID]; ok { + for _, u := range si.URLs { + if _, err := url.Parse(u); err != nil { + return xerrors.Errorf("failed to parse url %s: %w", si.URLs, err) + } + } + + i.stores[si.ID].URLs = append(i.stores[si.ID].URLs, si.URLs...) + return nil + } + i.stores[si.ID] = &si + return nil +} + +func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error { + i.lk.Lock() + defer i.lk.Unlock() + + d := decl{s, ft} + + for _, sid := range i.sectors[d] { + if sid == storageId { + log.Warnf("sector %v redeclared in %s", storageId) + return nil + } + } + + i.sectors[d] = append(i.sectors[d], storageId) + return nil +} + +func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType) ([]StorageInfo, error) { + i.lk.Lock() + defer i.lk.Unlock() + + storageIDs := i.sectors[decl{s, ft}] + out := make([]StorageInfo, len(storageIDs)) + + for j, id := range storageIDs { + st, ok := i.stores[id] + if !ok { + log.Warnf("storage %s is not present in sector index (referenced by sector %v)", id, s) + continue + } + + urls := make([]string, len(st.URLs)) + for k, u := range st.URLs { + rl, err := url.Parse(u) + if err != nil { + return nil, xerrors.Errorf("failed to parse url: %w", err) + } + + rl.Path = gopath.Join(rl.Path, ft.String(), sectorutil.SectorName(s)) + urls[k] = rl.String() + } + + out[j] = StorageInfo{ + ID: id, + URLs: nil, + Cost: st.Cost, + CanSeal: st.CanSeal, + CanStore: st.CanStore, + } + } + + return out, nil +} + +var _ SectorIndex = &Index{} diff --git a/storage/sealmgr/stores/local.go b/storage/sealmgr/stores/local.go index 88de7bf7b..e5f3f9548 100644 --- a/storage/sealmgr/stores/local.go +++ b/storage/sealmgr/stores/local.go @@ -12,11 +12,29 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-sectorbuilder" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/storage/sealmgr/sectorutil" ) +type StoragePath struct { + ID ID + Weight uint64 + + LocalPath string + + CanSeal bool + CanStore bool +} + +// [path]/sectorstore.json +type StorageMeta struct { + ID ID + Weight uint64 // 0 = readonly + + CanSeal bool + CanStore bool +} + type LocalStorage interface { GetStorage() (config.StorageConfig, error) SetStorage(func(*config.StorageConfig)) error @@ -36,7 +54,7 @@ type Local struct { type path struct { lk sync.Mutex - meta config.StorageMeta + meta StorageMeta local string sectors map[abi.SectorID]sectorbuilder.SectorFileType @@ -55,7 +73,7 @@ func (st *Local) OpenPath(p string) error { return xerrors.Errorf("reading storage metadata for %s: %w", p, err) } - var meta config.StorageMeta + var meta StorageMeta if err := json.Unmarshal(mb, &meta); err != nil { return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err) } @@ -151,7 +169,7 @@ func (st *Local) acquireSector(ctx context.Context, sid abi.SectorID, existing s spath := filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid)) sectorutil.SetPathByType(&out, fileType, spath) - sectorutil.SetPathByType(&storageIDs, fileType, p.meta.ID) + sectorutil.SetPathByType(&storageIDs, fileType, string(p.meta.ID)) existing ^= fileType } @@ -162,7 +180,8 @@ func (st *Local) acquireSector(ctx context.Context, sid abi.SectorID, existing s continue } - var best, bestID string + var best string + var bestID ID for _, p := range st.paths { if sealing && !p.meta.CanSeal { @@ -190,15 +209,15 @@ func (st *Local) acquireSector(ctx context.Context, sid abi.SectorID, existing s } sectorutil.SetPathByType(&out, fileType, best) - sectorutil.SetPathByType(&storageIDs, fileType, bestID) + sectorutil.SetPathByType(&storageIDs, fileType, string(bestID)) allocate ^= fileType } return out, storageIDs, st.localLk.RUnlock, nil } -func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]config.StorageMeta, error) { - var out []config.StorageMeta +func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageMeta, error) { + var out []StorageMeta for _, p := range st.paths { if sealing && !p.meta.CanSeal { @@ -221,8 +240,8 @@ func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sea return out, nil } -func (st *Local) FindSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuilder.SectorFileType) ([]config.StorageMeta, error) { - var out []config.StorageMeta +func (st *Local) FindSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuilder.SectorFileType) ([]StorageMeta, error) { + var out []StorageMeta for _, p := range st.paths { p.lk.Lock() t := p.sectors[abi.SectorID{ @@ -242,14 +261,14 @@ func (st *Local) FindSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuil return out, nil } -func (st *Local) Local() []api.StoragePath { - var out []api.StoragePath +func (st *Local) Local() []StoragePath { + var out []StoragePath for _, p := range st.paths { if p.local == "" { continue } - out = append(out, api.StoragePath{ + out = append(out, StoragePath{ ID: p.meta.ID, Weight: p.meta.Weight, LocalPath: p.local, diff --git a/storage/sealmgr/stores/remote.go b/storage/sealmgr/stores/remote.go index 05b2ce5dc..b0b19a884 100644 --- a/storage/sealmgr/stores/remote.go +++ b/storage/sealmgr/stores/remote.go @@ -15,15 +15,14 @@ import ( "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/lib/tarutil" "github.com/filecoin-project/lotus/storage/sealmgr/sectorutil" ) type Remote struct { - local *Local + local *Local remote SectorIndex - auth http.Header + auth http.Header fetchLk sync.Mutex // TODO: this can be much smarter // TODO: allow multiple parallel fetches @@ -32,17 +31,12 @@ type Remote struct { func NewRemote(local *Local, remote SectorIndex, auth http.Header) *Remote { return &Remote{ - local: local, - remote: remote, - auth: auth, + local: local, + remote: remote, + auth: auth, } } -type SectorIndex interface { - StorageDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error - StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) -} - func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { if existing|allocate != existing^allocate { return sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") @@ -74,7 +68,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec done = mergeDone(done, rdone) sectorutil.SetPathByType(&paths, fileType, ap) - if err := r.remote.StorageDeclareSector(ctx, storageID, s); err != nil { + if err := r.remote.StorageDeclareSector(ctx, storageID, s, fileType); err != nil { log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err) } } @@ -82,7 +76,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec return paths, done, nil } -func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, string, func(), error) { +func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, func(), error) { si, err := r.remote.StorageFindSector(ctx, s, fileType) if err != nil { return "", "", nil, err @@ -111,7 +105,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType if merr != nil { log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr) } - return dest, storageID, done, nil + return dest, ID(storageID), done, nil } } @@ -119,7 +113,6 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType return "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote: %w", s, merr) } - func (r *Remote) fetch(url, outname string) error { log.Infof("Fetch %s -> %s", url, outname)