workers: Implement SectorIndex

This commit is contained in:
Łukasz Magiera 2020-03-13 12:59:19 +01:00
parent a75ad492f1
commit 2c7f579838
17 changed files with 217 additions and 104 deletions

View File

@ -7,11 +7,11 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
) )
// alias because cbor-gen doesn't like non-alias types // alias because cbor-gen doesn't like non-alias types
@ -110,9 +110,7 @@ type StorageMiner interface {
// WorkerConnect tells the node to connect to workers RPC // WorkerConnect tells the node to connect to workers RPC
WorkerConnect(context.Context, string) error WorkerConnect(context.Context, string) error
WorkerAttachStorage(context.Context, StorageInfo) error stores.SectorIndex
StorageDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error
StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error)
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error
MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error) MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error)
@ -125,25 +123,6 @@ type StorageMiner interface {
StorageAddLocal(ctx context.Context, path string) error StorageAddLocal(ctx context.Context, path string) error
} }
type StorageInfo struct {
ID string
URLs []string // TODO: Support non-http transports
Cost int
CanSeal bool
CanStore bool
}
type StoragePath struct {
ID string
Weight uint64
LocalPath string
CanSeal bool
CanStore bool
}
type SealRes struct { type SealRes struct {
Err string Err string
GoErr error `json:"-"` GoErr error `json:"-"`

View File

@ -2,6 +2,7 @@ package api
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
@ -14,7 +15,7 @@ type WorkerApi interface {
// TODO: Info() (name, ...) ? // TODO: Info() (name, ...) ?
TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) // TaskType -> Weight TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) // TaskType -> Weight
Paths(context.Context) ([]StoragePath, error) Paths(context.Context) ([]stores.StoragePath, error)
storage.Sealer storage.Sealer
} }

View File

@ -2,6 +2,7 @@ package apistruct
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -181,9 +182,9 @@ type StorageMinerStruct struct {
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"` SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"`
WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm
WorkerAttachStorage func(context.Context, api.StorageInfo) error `perm:"admin"` WorkerAttachStorage func(context.Context, stores.StorageInfo) error `perm:"admin"`
StorageDeclareSector func(ctx context.Context, storageId string, s abi.SectorID) error `perm:"admin"` StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"`
StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) `perm:"admin"` StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) `perm:"admin"`
DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"` DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
@ -199,7 +200,7 @@ type WorkerStruct struct {
Version func(context.Context) (build.Version, error) `perm:"admin"` Version func(context.Context) (build.Version, error) `perm:"admin"`
TaskTypes func(context.Context) (map[sealmgr.TaskType]struct{}, error) `perm:"admin"` TaskTypes func(context.Context) (map[sealmgr.TaskType]struct{}, error) `perm:"admin"`
Paths func(context.Context) ([]api.StoragePath, error) `perm:"admin"` Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"` SealPreCommit1 func(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"`
SealPreCommit2 func(context.Context, abi.SectorNumber, storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) `perm:"admin"` SealPreCommit2 func(context.Context, abi.SectorNumber, storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) `perm:"admin"`
@ -651,15 +652,15 @@ func (c *StorageMinerStruct) WorkerConnect(ctx context.Context, url string) erro
return c.Internal.WorkerConnect(ctx, url) return c.Internal.WorkerConnect(ctx, url)
} }
func (c *StorageMinerStruct) WorkerAttachStorage(ctx context.Context, si api.StorageInfo) error { func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo) error {
return c.Internal.WorkerAttachStorage(ctx, si) return c.Internal.WorkerAttachStorage(ctx, si)
} }
func (c *StorageMinerStruct) StorageDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error { func (c *StorageMinerStruct) StorageDeclareSector(ctx context.Context, storageId stores.ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error {
return c.Internal.StorageDeclareSector(ctx, storageId, s) return c.Internal.StorageDeclareSector(ctx, storageId, s, ft)
} }
func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) { func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) {
return c.Internal.StorageFindSector(ctx, si, types) return c.Internal.StorageFindSector(ctx, si, types)
} }
@ -699,7 +700,7 @@ func (w *WorkerStruct) TaskTypes(ctx context.Context) (map[sealmgr.TaskType]stru
return w.Internal.TaskTypes(ctx) return w.Internal.TaskTypes(ctx)
} }
func (w *WorkerStruct) Paths(ctx context.Context) ([]api.StoragePath, error) { func (w *WorkerStruct) Paths(ctx context.Context) ([]stores.StoragePath, error) {
return w.Internal.Paths(ctx) return w.Internal.Paths(ctx)
} }

View File

@ -7,6 +7,7 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -26,7 +27,6 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/node/config"
) )
var log = logging.Logger("preseal") var log = logging.Logger("preseal")
@ -134,8 +134,8 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
} }
{ {
b, err := json.MarshalIndent(&config.StorageMeta{ b, err := json.MarshalIndent(&stores.StorageMeta{
ID: uuid.New().String(), ID: stores.ID(uuid.New().String()),
Weight: 0, // read-only Weight: 0, // read-only
CanSeal: false, CanSeal: false,
CanStore: false, CanStore: false,

View File

@ -7,6 +7,7 @@ import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -187,8 +188,8 @@ var initCmd = &cli.Command{
} }
if !cctx.Bool("no-local-storage") { if !cctx.Bool("no-local-storage") {
b, err := json.MarshalIndent(&config.StorageMeta{ b, err := json.MarshalIndent(&stores.StorageMeta{
ID: uuid.New().String(), ID: stores.ID(uuid.New().String()),
Weight: 10, Weight: 10,
CanSeal: true, CanSeal: true,
CanStore: true, CanStore: true,

View File

@ -2,6 +2,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -12,7 +13,6 @@ import (
"gopkg.in/urfave/cli.v2" "gopkg.in/urfave/cli.v2"
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/node/config"
) )
const metaFile = "sectorstore.json" const metaFile = "sectorstore.json"
@ -79,8 +79,8 @@ var storageAttachCmd = &cli.Command{
return err return err
} }
cfg := &config.StorageMeta{ cfg := &stores.StorageMeta{
ID: uuid.New().String(), ID: stores.ID(uuid.New().String()),
Weight: cctx.Uint64("weight"), Weight: cctx.Uint64("weight"),
CanSeal: cctx.Bool("seal"), CanSeal: cctx.Bool("seal"),
CanStore: cctx.Bool("store"), CanStore: cctx.Bool("store"),

View File

@ -18,15 +18,6 @@ type StorageConfig struct {
StoragePaths []LocalPath StoragePaths []LocalPath
} }
// [path]/metadata.json
type StorageMeta struct {
ID string
Weight uint64 // 0 = readonly
CanSeal bool
CanStore bool
}
func StorageFromFile(path string, def *StorageConfig) (*StorageConfig, error) { func StorageFromFile(path string, def *StorageConfig) (*StorageConfig, error) {
file, err := os.Open(path) file, err := os.Open(path)
switch { switch {

View File

@ -3,6 +3,7 @@ package impl
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
@ -151,15 +152,15 @@ func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error
panic("todo register ") panic("todo register ")
} }
func (sm *StorageMinerAPI) WorkerAttachStorage(ctx context.Context, si api.StorageInfo) error { func (sm *StorageMinerAPI) StorageAttach(ctx context.Context, si stores.StorageInfo) error {
panic("implement me") panic("implement me")
} }
func (sm *StorageMinerAPI) WorkerDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error { func (sm *StorageMinerAPI) StorageDeclareSector(ctx context.Context, storageId stores.ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error {
panic("implement me") panic("implement me")
} }
func (sm *StorageMinerAPI) FindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) { func (sm *StorageMinerAPI) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) {
panic("implement me") panic("implement me")
} }

View File

@ -50,7 +50,7 @@ func defConfForType(t RepoType) interface{} {
case StorageMiner: case StorageMiner:
return config.DefaultStorageMiner() return config.DefaultStorageMiner()
case Worker: case Worker:
return &struct {}{} return &struct{}{}
default: default:
panic(fmt.Sprintf("unknown RepoType(%d)", int(t))) panic(fmt.Sprintf("unknown RepoType(%d)", int(t)))
} }

View File

@ -2,6 +2,7 @@ package repo
import ( import (
"encoding/json" "encoding/json"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -81,8 +82,8 @@ func (lmem *lockedMemRepo) Path() string {
panic(err) panic(err)
} }
b, err := json.MarshalIndent(&config.StorageMeta{ b, err := json.MarshalIndent(&stores.StorageMeta{
ID: uuid.New().String(), ID: stores.ID(uuid.New().String()),
Weight: 10, Weight: 10,
CanSeal: true, CanSeal: true,
CanStore: true, CanStore: true,

View File

@ -12,7 +12,6 @@ import (
storage2 "github.com/filecoin-project/specs-storage/storage" storage2 "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sealmgr"
@ -123,7 +122,7 @@ func (l *LocalWorker) TaskTypes(context.Context) (map[sealmgr.TaskType]struct{},
}, nil }, nil
} }
func (l *LocalWorker) Paths(context.Context) ([]api.StoragePath, error) { func (l *LocalWorker) Paths(context.Context) ([]stores.StoragePath, error) {
return l.localStore.Local(), nil return l.localStore.Local(), nil
} }

View File

@ -13,7 +13,6 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
@ -33,7 +32,7 @@ type Worker interface {
sectorbuilder.Sealer sectorbuilder.Sealer
TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error)
Paths(context.Context) ([]api.StoragePath, error) Paths(context.Context) ([]stores.StoragePath, error)
} }
type Manager struct { type Manager struct {
@ -116,9 +115,9 @@ func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorNumber, s
panic("implement me") panic("implement me")
} }
func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []config.StorageMeta) ([]Worker, map[int]config.StorageMeta) { func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.StorageMeta) ([]Worker, map[int]stores.StorageMeta) {
var workers []Worker var workers []Worker
paths := map[int]config.StorageMeta{} paths := map[int]stores.StorageMeta{}
for i, worker := range m.workers { for i, worker := range m.workers {
tt, err := worker.TaskTypes(context.TODO()) tt, err := worker.TaskTypes(context.TODO())
@ -137,7 +136,7 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []config.Stor
} }
// check if the worker has access to the path we selected // check if the worker has access to the path we selected
var st *config.StorageMeta var st *stores.StorageMeta
for _, p := range phs { for _, p := range phs {
for _, meta := range inPaths { for _, meta := range inPaths {
if p.ID == meta.ID { if p.ID == meta.ID {
@ -164,7 +163,7 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []config.Stor
func (m *Manager) AddPiece(ctx context.Context, sn abi.SectorNumber, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { func (m *Manager) AddPiece(ctx context.Context, sn abi.SectorNumber, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
// TODO: consider multiple paths vs workers when initially allocating // TODO: consider multiple paths vs workers when initially allocating
var best []config.StorageMeta var best []stores.StorageMeta
var err error var err error
if len(existingPieces) == 0 { // new if len(existingPieces) == 0 { // new
best, err = m.storage.FindBestAllocStorage(sectorbuilder.FTUnsealed, true) best, err = m.storage.FindBestAllocStorage(sectorbuilder.FTUnsealed, true)

View File

@ -17,7 +17,7 @@ type remote struct {
} }
func (r *remote) AddPiece(ctx context.Context, sector abi.SectorNumber, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) { func (r *remote) AddPiece(ctx context.Context, sector abi.SectorNumber, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) {
return abi.PieceInfo{},xerrors.New("unsupported") return abi.PieceInfo{}, xerrors.New("unsupported")
} }
func ConnectRemote(ctx context.Context, fa api.FullNode, url string) (*remote, error) { func ConnectRemote(ctx context.Context, fa api.FullNode, url string) (*remote, error) {

View File

@ -0,0 +1,128 @@
package stores
import (
"context"
"net/url"
gopath "path"
"sync"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil"
)
// ID identifies sector storage by UUID. One sector storage should map to one
// filesystem, local or networked / shared by multiple machines
type ID string
type StorageInfo struct {
ID ID
URLs []string // TODO: Support non-http transports
Cost int
CanSeal bool
CanStore bool
}
type SectorIndex interface { // part of storage-miner api
StorageAttach(context.Context, StorageInfo) error
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error
StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error)
}
type decl struct {
abi.SectorID
sectorbuilder.SectorFileType
}
type Index struct {
lk sync.Mutex
sectors map[decl][]ID
stores map[ID]*StorageInfo
}
func NewIndex() *Index {
return &Index{
sectors: map[decl][]ID{},
stores: map[ID]*StorageInfo{},
}
}
func (i *Index) StorageAttach(ctx context.Context, si StorageInfo) error {
i.lk.Lock()
defer i.lk.Unlock()
if _, ok := i.stores[si.ID]; ok {
for _, u := range si.URLs {
if _, err := url.Parse(u); err != nil {
return xerrors.Errorf("failed to parse url %s: %w", si.URLs, err)
}
}
i.stores[si.ID].URLs = append(i.stores[si.ID].URLs, si.URLs...)
return nil
}
i.stores[si.ID] = &si
return nil
}
func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error {
i.lk.Lock()
defer i.lk.Unlock()
d := decl{s, ft}
for _, sid := range i.sectors[d] {
if sid == storageId {
log.Warnf("sector %v redeclared in %s", storageId)
return nil
}
}
i.sectors[d] = append(i.sectors[d], storageId)
return nil
}
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType) ([]StorageInfo, error) {
i.lk.Lock()
defer i.lk.Unlock()
storageIDs := i.sectors[decl{s, ft}]
out := make([]StorageInfo, len(storageIDs))
for j, id := range storageIDs {
st, ok := i.stores[id]
if !ok {
log.Warnf("storage %s is not present in sector index (referenced by sector %v)", id, s)
continue
}
urls := make([]string, len(st.URLs))
for k, u := range st.URLs {
rl, err := url.Parse(u)
if err != nil {
return nil, xerrors.Errorf("failed to parse url: %w", err)
}
rl.Path = gopath.Join(rl.Path, ft.String(), sectorutil.SectorName(s))
urls[k] = rl.String()
}
out[j] = StorageInfo{
ID: id,
URLs: nil,
Cost: st.Cost,
CanSeal: st.CanSeal,
CanStore: st.CanStore,
}
}
return out, nil
}
var _ SectorIndex = &Index{}

View File

@ -12,11 +12,29 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil" "github.com/filecoin-project/lotus/storage/sealmgr/sectorutil"
) )
type StoragePath struct {
ID ID
Weight uint64
LocalPath string
CanSeal bool
CanStore bool
}
// [path]/sectorstore.json
type StorageMeta struct {
ID ID
Weight uint64 // 0 = readonly
CanSeal bool
CanStore bool
}
type LocalStorage interface { type LocalStorage interface {
GetStorage() (config.StorageConfig, error) GetStorage() (config.StorageConfig, error)
SetStorage(func(*config.StorageConfig)) error SetStorage(func(*config.StorageConfig)) error
@ -36,7 +54,7 @@ type Local struct {
type path struct { type path struct {
lk sync.Mutex lk sync.Mutex
meta config.StorageMeta meta StorageMeta
local string local string
sectors map[abi.SectorID]sectorbuilder.SectorFileType sectors map[abi.SectorID]sectorbuilder.SectorFileType
@ -55,7 +73,7 @@ func (st *Local) OpenPath(p string) error {
return xerrors.Errorf("reading storage metadata for %s: %w", p, err) return xerrors.Errorf("reading storage metadata for %s: %w", p, err)
} }
var meta config.StorageMeta var meta StorageMeta
if err := json.Unmarshal(mb, &meta); err != nil { if err := json.Unmarshal(mb, &meta); err != nil {
return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err) return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err)
} }
@ -151,7 +169,7 @@ func (st *Local) acquireSector(ctx context.Context, sid abi.SectorID, existing s
spath := filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid)) spath := filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
sectorutil.SetPathByType(&out, fileType, spath) sectorutil.SetPathByType(&out, fileType, spath)
sectorutil.SetPathByType(&storageIDs, fileType, p.meta.ID) sectorutil.SetPathByType(&storageIDs, fileType, string(p.meta.ID))
existing ^= fileType existing ^= fileType
} }
@ -162,7 +180,8 @@ func (st *Local) acquireSector(ctx context.Context, sid abi.SectorID, existing s
continue continue
} }
var best, bestID string var best string
var bestID ID
for _, p := range st.paths { for _, p := range st.paths {
if sealing && !p.meta.CanSeal { if sealing && !p.meta.CanSeal {
@ -190,15 +209,15 @@ func (st *Local) acquireSector(ctx context.Context, sid abi.SectorID, existing s
} }
sectorutil.SetPathByType(&out, fileType, best) sectorutil.SetPathByType(&out, fileType, best)
sectorutil.SetPathByType(&storageIDs, fileType, bestID) sectorutil.SetPathByType(&storageIDs, fileType, string(bestID))
allocate ^= fileType allocate ^= fileType
} }
return out, storageIDs, st.localLk.RUnlock, nil return out, storageIDs, st.localLk.RUnlock, nil
} }
func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]config.StorageMeta, error) { func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageMeta, error) {
var out []config.StorageMeta var out []StorageMeta
for _, p := range st.paths { for _, p := range st.paths {
if sealing && !p.meta.CanSeal { if sealing && !p.meta.CanSeal {
@ -221,8 +240,8 @@ func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sea
return out, nil return out, nil
} }
func (st *Local) FindSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuilder.SectorFileType) ([]config.StorageMeta, error) { func (st *Local) FindSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuilder.SectorFileType) ([]StorageMeta, error) {
var out []config.StorageMeta var out []StorageMeta
for _, p := range st.paths { for _, p := range st.paths {
p.lk.Lock() p.lk.Lock()
t := p.sectors[abi.SectorID{ t := p.sectors[abi.SectorID{
@ -242,14 +261,14 @@ func (st *Local) FindSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuil
return out, nil return out, nil
} }
func (st *Local) Local() []api.StoragePath { func (st *Local) Local() []StoragePath {
var out []api.StoragePath var out []StoragePath
for _, p := range st.paths { for _, p := range st.paths {
if p.local == "" { if p.local == "" {
continue continue
} }
out = append(out, api.StoragePath{ out = append(out, StoragePath{
ID: p.meta.ID, ID: p.meta.ID,
Weight: p.meta.Weight, Weight: p.meta.Weight,
LocalPath: p.local, LocalPath: p.local,

View File

@ -15,7 +15,6 @@ import (
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/tarutil" "github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil" "github.com/filecoin-project/lotus/storage/sealmgr/sectorutil"
) )
@ -38,11 +37,6 @@ func NewRemote(local *Local, remote SectorIndex, auth http.Header) *Remote {
} }
} }
type SectorIndex interface {
StorageDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error
StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error)
}
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
if existing|allocate != existing^allocate { if existing|allocate != existing^allocate {
return sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") return sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
@ -74,7 +68,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
done = mergeDone(done, rdone) done = mergeDone(done, rdone)
sectorutil.SetPathByType(&paths, fileType, ap) sectorutil.SetPathByType(&paths, fileType, ap)
if err := r.remote.StorageDeclareSector(ctx, storageID, s); err != nil { if err := r.remote.StorageDeclareSector(ctx, storageID, s, fileType); err != nil {
log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err) log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err)
} }
} }
@ -82,7 +76,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
return paths, done, nil return paths, done, nil
} }
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, string, func(), error) { func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, func(), error) {
si, err := r.remote.StorageFindSector(ctx, s, fileType) si, err := r.remote.StorageFindSector(ctx, s, fileType)
if err != nil { if err != nil {
return "", "", nil, err return "", "", nil, err
@ -111,7 +105,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
if merr != nil { if merr != nil {
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr) log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
} }
return dest, storageID, done, nil return dest, ID(storageID), done, nil
} }
} }
@ -119,7 +113,6 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
return "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote: %w", s, merr) return "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote: %w", s, merr)
} }
func (r *Remote) fetch(url, outname string) error { func (r *Remote) fetch(url, outname string) error {
log.Infof("Fetch %s -> %s", url, outname) log.Infof("Fetch %s -> %s", url, outname)