stores: Use index in local store

This commit is contained in:
Łukasz Magiera 2020-03-19 16:10:19 +01:00
parent 2fa3f2578f
commit d87b7c264d
14 changed files with 243 additions and 206 deletions

View File

@ -110,8 +110,6 @@ type StorageMiner interface {
StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error)
StorageInfo(context.Context, stores.ID) (stores.StorageInfo, error)
// WorkerConnect tells the node to connect to workers RPC
WorkerConnect(context.Context, string) error
stores.SectorIndex

View File

@ -181,12 +181,13 @@ type StorageMinerStruct struct {
SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"`
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"`
WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm
StorageAttach func(context.Context, stores.StorageInfo) error `perm:"admin"`
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"`
StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) `perm:"admin"`
StorageList func(ctx context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
StorageInfo func(context.Context, stores.ID) (stores.StorageInfo, error) `perm:"admin"`
WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm
StorageAttach func(context.Context, stores.StorageInfo, stores.FsStat) error `perm:"admin"`
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"`
StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) `perm:"admin"`
StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
StorageInfo func(context.Context, stores.ID) (stores.StorageInfo, error) `perm:"admin"`
StorageBestAlloc func(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]stores.StorageInfo, error) `perm:"admin"`
DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
@ -654,8 +655,8 @@ func (c *StorageMinerStruct) WorkerConnect(ctx context.Context, url string) erro
return c.Internal.WorkerConnect(ctx, url)
}
func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo) error {
return c.Internal.StorageAttach(ctx, si)
func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo, st stores.FsStat) error {
return c.Internal.StorageAttach(ctx, si, st)
}
func (c *StorageMinerStruct) StorageDeclareSector(ctx context.Context, storageId stores.ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error {
@ -674,6 +675,10 @@ func (c *StorageMinerStruct) StorageInfo(ctx context.Context, id stores.ID) (sto
return c.Internal.StorageInfo(ctx, id)
}
func (c *StorageMinerStruct) StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]stores.StorageInfo, error) {
return c.Internal.StorageBestAlloc(ctx, allocate, sealing)
}
func (c *StorageMinerStruct) MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error {
return c.Internal.MarketImportDealData(ctx, propcid, path)
}

View File

@ -157,7 +157,7 @@ var runCmd = &cli.Command{
var localPaths []config.LocalPath
if !cctx.Bool("no-local-storage") {
b, err := json.MarshalIndent(&stores.StorageMeta{
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: 10,
CanSeal: true,
@ -199,21 +199,13 @@ var runCmd = &cli.Command{
return err
}
localStore, err := stores.NewLocal(lr)
log.Info("Opening local storage; connecting to master")
localStore, err := stores.NewLocal(ctx, lr, nodeApi, []string{"http://" + cctx.String("address") + "/remote"})
if err != nil {
return err
}
log.Info("Connecting local storage to master")
if err := stores.DeclareLocalStorage(
ctx,
nodeApi,
localStore,
[]string{"http://" + cctx.String("address") + "/remote"}, // TODO: Less hardcoded
1); err != nil {
return err
}
// Setup remote sector store
_, spt, err := api.ProofTypeFromSectorSize(ssize)
if err != nil {
@ -230,12 +222,12 @@ var runCmd = &cli.Command{
// Create / expose the worker
workerApi := &worker{
LocalWorker: advmgr.NewLocalWorker(act, spt, remote, localStore, stores.NewIndex()),
LocalWorker: advmgr.NewLocalWorker(act, spt, remote, localStore, nodeApi),
}
mux := mux.NewRouter()
log.Info("Setting up control endpoint at " + "http://" + cctx.String("address"))
log.Info("Setting up control endpoint at " + cctx.String("address"))
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi))

View File

@ -137,7 +137,7 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
}
{
b, err := json.MarshalIndent(&stores.StorageMeta{
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: 0, // read-only
CanSeal: false,

View File

@ -188,7 +188,7 @@ var initCmd = &cli.Command{
}
if !cctx.Bool("no-local-storage") {
b, err := json.MarshalIndent(&stores.StorageMeta{
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: 10,
CanSeal: true,
@ -400,7 +400,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
smgr, err := advmgr.New(lr, stores.NewIndex(), &sectorbuilder.Config{
SealProofType: spt,
PoStProofType: ppt,
}, nil)
}, nil, api)
if err != nil {
return err
}

View File

@ -83,7 +83,7 @@ var storageAttachCmd = &cli.Command{
return err
}
cfg := &stores.StorageMeta{
cfg := &stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: cctx.Uint64("weight"),
CanSeal: cctx.Bool("seal"),
@ -144,7 +144,7 @@ var storageListCmd = &cli.Command{
if err != nil {
return err
}
fmt.Printf("\tSeal: %t; Store: %t; Cost: %d\n", si.CanSeal, si.CanStore, si.Cost)
fmt.Printf("\tSeal: %t; Store: %t; Weight: %d\n", si.CanSeal, si.CanStore, si.Weight)
for _, l := range si.URLs {
fmt.Printf("\tReachable %s\n", l) // TODO; try pinging maybe?? print latency?
}

View File

@ -195,7 +195,7 @@ func (sm *StorageMinerAPI) StorageAddLocal(ctx context.Context, path string) err
return xerrors.Errorf("no storage manager")
}
return sm.StorageMgr.AddLocalStorage(path)
return sm.StorageMgr.AddLocalStorage(ctx, path)
}
var _ api.StorageMiner = &StorageMinerAPI{}

View File

@ -82,7 +82,7 @@ func (lmem *lockedMemRepo) Path() string {
panic(err)
}
b, err := json.MarshalIndent(&stores.StorageMeta{
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: 10,
CanSeal: true,

View File

@ -17,7 +17,6 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/impl/common"
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
)
@ -43,22 +42,21 @@ type Manager struct {
storage *stores.Remote
localStore *stores.Local
remoteHnd *stores.FetchHandler
index stores.SectorIndex
storage2.Prover
lk sync.Mutex
}
func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca common.CommonAPI) (*Manager, error) {
lstor, err := stores.NewLocal(ls)
func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca api.Common) (*Manager, error) {
ctx := context.TODO()
lstor, err := stores.NewLocal(ctx, ls, si, urls)
if err != nil {
return nil, err
}
if err := stores.DeclareLocalStorage(context.TODO(), si, lstor, urls, 10); err != nil {
log.Errorf("Declaring local storage failed: %+v")
}
prover, err := sectorbuilder.New(&readonlyProvider{stor: lstor}, cfg)
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
@ -79,6 +77,7 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi
storage: stor,
localStore: lstor,
remoteHnd: &stores.FetchHandler{Store: lstor},
index: si,
Prover: prover,
}
@ -86,13 +85,13 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi
return m, nil
}
func (m *Manager) AddLocalStorage(path string) error {
func (m *Manager) AddLocalStorage(ctx context.Context, path string) error {
path, err := homedir.Expand(path)
if err != nil {
return xerrors.Errorf("expanding local path: %w", err)
}
if err := m.localStore.OpenPath(path); err != nil {
if err := m.localStore.OpenPath(ctx, path); err != nil {
return xerrors.Errorf("opening local path: %w", err)
}
@ -125,9 +124,9 @@ func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, secto
panic("implement me")
}
func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.StorageMeta) ([]Worker, map[int]stores.StorageMeta) {
func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.StorageInfo) ([]Worker, map[int]stores.StorageInfo) {
var workers []Worker
paths := map[int]stores.StorageMeta{}
paths := map[int]stores.StorageInfo{}
for i, worker := range m.workers {
tt, err := worker.TaskTypes(context.TODO())
@ -147,7 +146,7 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.Stor
}
// check if the worker has access to the path we selected
var st *stores.StorageMeta
var st *stores.StorageInfo
for _, p := range phs {
for _, meta := range inPaths {
if p.ID == meta.ID {
@ -181,12 +180,12 @@ func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error {
func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
// TODO: consider multiple paths vs workers when initially allocating
var best []stores.StorageMeta
var best []stores.StorageInfo
var err error
if len(existingPieces) == 0 { // new
best, err = m.storage.FindBestAllocStorage(sectorbuilder.FTUnsealed, true)
best, err = m.index.StorageBestAlloc(ctx, sectorbuilder.FTUnsealed, true)
} else { // append to existing
best, err = m.storage.FindSector(sector, sectorbuilder.FTUnsealed)
best, err = m.index.StorageFindSector(ctx, sector, sectorbuilder.FTUnsealed)
}
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
@ -207,7 +206,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) {
// TODO: also consider where the unsealed data sits
best, err := m.storage.FindBestAllocStorage(sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
best, err := m.index.StorageBestAlloc(ctx, sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
@ -222,7 +221,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (cids storage2.SectorCids, err error) {
// TODO: allow workers to fetch the sectors
best, err := m.storage.FindSector(sector, sectorbuilder.FTCache|sectorbuilder.FTSealed)
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed)
if err != nil {
return storage2.SectorCids{}, xerrors.Errorf("finding path for sector sealing: %w", err)
}
@ -235,7 +234,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
}
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (output storage2.Commit1Out, err error) {
best, err := m.storage.FindSector(sector, sectorbuilder.FTCache|sectorbuilder.FTSealed)
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed)
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
@ -265,7 +264,7 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
}
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
best, err := m.storage.FindSector(sector, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed)
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}

View File

@ -142,8 +142,8 @@ func (l *LocalWorker) TaskTypes(context.Context) (map[sealmgr.TaskType]struct{},
}, nil
}
func (l *LocalWorker) Paths(context.Context) ([]stores.StoragePath, error) {
return l.localStore.Local(), nil
func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
return l.localStore.Local(ctx)
}
var _ Worker = &LocalWorker{}

View File

@ -2,14 +2,17 @@ package stores
import (
"context"
"math/bits"
"net/url"
gopath "path"
"sort"
"sync"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil"
)
@ -19,19 +22,23 @@ import (
type ID string
type StorageInfo struct {
ID ID
URLs []string // TODO: Support non-http transports
Cost int
ID ID
URLs []string // TODO: Support non-http transports
Weight uint64
CanSeal bool
CanStore bool
}
type SectorIndex interface { // part of storage-miner api
StorageAttach(context.Context, StorageInfo) error
StorageAttach(context.Context, StorageInfo, FsStat) error
StorageInfo(context.Context, ID) (StorageInfo, error)
// TODO: StorageUpdateStats(FsStat)
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error
StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error)
StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageInfo, error)
}
type Decl struct {
@ -39,17 +46,22 @@ type Decl struct {
sectorbuilder.SectorFileType
}
type storageEntry struct {
info *StorageInfo
fsi FsStat
}
type Index struct {
lk sync.Mutex
lk sync.RWMutex
sectors map[Decl][]ID
stores map[ID]*StorageInfo
stores map[ID]*storageEntry
}
func NewIndex() *Index {
return &Index{
sectors: map[Decl][]ID{},
stores: map[ID]*StorageInfo{},
stores: map[ID]*storageEntry{},
}
}
@ -79,7 +91,7 @@ func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) {
return out, nil
}
func (i *Index) StorageAttach(ctx context.Context, si StorageInfo) error {
func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st FsStat) error {
i.lk.Lock()
defer i.lk.Unlock()
@ -92,10 +104,13 @@ func (i *Index) StorageAttach(ctx context.Context, si StorageInfo) error {
}
}
i.stores[si.ID].URLs = append(i.stores[si.ID].URLs, si.URLs...)
i.stores[si.ID].info.URLs = append(i.stores[si.ID].info.URLs, si.URLs...)
return nil
}
i.stores[si.ID] = &si
i.stores[si.ID] = &storageEntry{
info: &si,
fsi: st,
}
return nil
}
@ -124,8 +139,12 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se
}
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType) ([]StorageInfo, error) {
i.lk.Lock()
defer i.lk.Unlock()
if bits.OnesCount(uint(ft)) != 1 {
return nil, xerrors.Errorf("findSector only works for a single file type")
}
i.lk.RLock()
defer i.lk.RUnlock()
storageIDs := i.sectors[Decl{s, ft}]
out := make([]StorageInfo, len(storageIDs))
@ -137,8 +156,8 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sector
continue
}
urls := make([]string, len(st.URLs))
for k, u := range st.URLs {
urls := make([]string, len(st.info.URLs))
for k, u := range st.info.URLs {
rl, err := url.Parse(u)
if err != nil {
return nil, xerrors.Errorf("failed to parse url: %w", err)
@ -151,9 +170,9 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sector
out[j] = StorageInfo{
ID: id,
URLs: nil,
Cost: st.Cost,
CanSeal: st.CanSeal,
CanStore: st.CanStore,
Weight: st.info.Weight,
CanSeal: st.info.CanSeal,
CanStore: st.info.CanStore,
}
}
@ -161,36 +180,65 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sector
}
func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) {
i.lk.RLock()
defer i.lk.RUnlock()
si, found := i.stores[id]
if !found {
return StorageInfo{}, xerrors.Errorf("sector store not found")
}
return *si, nil
return *si.info, nil
}
func DeclareLocalStorage(ctx context.Context, idx SectorIndex, localStore *Local, urls []string, cost int) error {
for _, path := range localStore.Local() {
err := idx.StorageAttach(ctx, StorageInfo{
ID: path.ID,
URLs: urls,
Cost: cost,
CanSeal: path.CanSeal,
CanStore: path.CanStore,
})
if err != nil {
log.Errorf("attaching local storage to remote: %+v", err)
func (i *Index) StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageInfo, error) {
i.lk.RLock()
defer i.lk.RUnlock()
var candidates []storageEntry
for _, p := range i.stores {
if sealing && !p.info.CanSeal {
log.Debugf("alloc: not considering %s; can't seal", p.info.ID)
continue
}
if !sealing && !p.info.CanStore {
log.Debugf("alloc: not considering %s; can't store", p.info.ID)
continue
}
for id, fileType := range localStore.List(path.ID) {
if err := idx.StorageDeclareSector(ctx, path.ID, id, fileType); err != nil {
log.Errorf("declaring sector: %+v")
}
}
// TODO: filter out of space
candidates = append(candidates, *p)
}
return nil
if len(candidates) == 0 {
return nil, xerrors.New("no good path found")
}
sort.Slice(candidates, func(i, j int) bool {
iw := big.Mul(big.NewInt(int64(candidates[i].fsi.Free)), big.NewInt(int64(candidates[i].info.Weight)))
jw := big.Mul(big.NewInt(int64(candidates[j].fsi.Free)), big.NewInt(int64(candidates[j].info.Weight)))
return iw.GreaterThan(jw)
})
out := make([]StorageInfo, len(candidates))
for i, candidate := range candidates {
out[i] = *candidate.info
}
return out, nil
}
func (i *Index) FindSector(id abi.SectorID, typ sectorbuilder.SectorFileType) ([]ID, error) {
i.lk.RLock()
defer i.lk.RUnlock()
return i.sectors[Decl{
SectorID: id,
SectorFileType: typ,
}], nil
}
var _ SectorIndex = &Index{}

View File

@ -2,6 +2,9 @@ package stores
import (
"context"
"syscall"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
@ -10,3 +13,20 @@ import (
type Store interface {
AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error)
}
type FsStat struct {
Capacity uint64
Free uint64 // Free to use for sector storage
}
func Stat(path string) (FsStat, error) {
var stat syscall.Statfs_t
if err := syscall.Statfs(path, &stat); err != nil {
return FsStat{}, xerrors.Errorf("statfs: %w", err)
}
return FsStat{
Capacity: stat.Blocks * uint64(stat.Bsize),
Free: stat.Bavail * uint64(stat.Bsize),
}, nil
}

View File

@ -27,7 +27,7 @@ type StoragePath struct {
}
// [path]/sectorstore.json
type StorageMeta struct {
type LocalStorageMeta struct {
ID ID
Weight uint64 // 0 = readonly
@ -45,35 +45,40 @@ const MetaFile = "sectorstore.json"
var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache}
type Local struct {
localLk sync.RWMutex
localStorage LocalStorage
index SectorIndex
urls []string
paths []*path
paths map[ID]*path
localLk sync.RWMutex
}
type path struct {
lk sync.Mutex
meta StorageMeta
local string
sectors map[abi.SectorID]sectorbuilder.SectorFileType
local string // absolute local path
}
func NewLocal(ls LocalStorage) (*Local, error) {
func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) {
l := &Local{
localStorage: ls,
index: index,
urls: urls,
paths: map[ID]*path{},
}
return l, l.open()
return l, l.open(ctx)
}
func (st *Local) OpenPath(p string) error {
func (st *Local) OpenPath(ctx context.Context, p string) error {
st.localLk.Lock()
defer st.localLk.Unlock()
mb, err := ioutil.ReadFile(filepath.Join(p, MetaFile))
if err != nil {
return xerrors.Errorf("reading storage metadata for %s: %w", p, err)
}
var meta StorageMeta
var meta LocalStorageMeta
if err := json.Unmarshal(mb, &meta); err != nil {
return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err)
}
@ -81,9 +86,23 @@ func (st *Local) OpenPath(p string) error {
// TODO: Check existing / dedupe
out := &path{
meta: meta,
local: p,
sectors: map[abi.SectorID]sectorbuilder.SectorFileType{},
local: p,
}
fst, err := Stat(p)
if err != nil {
return err
}
err = st.index.StorageAttach(ctx, StorageInfo{
ID: meta.ID,
URLs: st.urls,
Weight: meta.Weight,
CanSeal: meta.CanSeal,
CanStore: meta.CanStore,
}, fst)
if err != nil {
return xerrors.Errorf("declaring storage in index: %w", err)
}
for _, t := range pathTypes {
@ -105,26 +124,25 @@ func (st *Local) OpenPath(p string) error {
return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err)
}
out.sectors[sid] |= t
if err := st.index.StorageDeclareSector(ctx, meta.ID, sid, t); err != nil {
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", sid, t, meta.ID, err)
}
}
}
st.paths = append(st.paths, out)
st.paths[meta.ID] = out
return nil
}
func (st *Local) open() error {
st.localLk.Lock()
defer st.localLk.Unlock()
func (st *Local) open(ctx context.Context) error {
cfg, err := st.localStorage.GetStorage()
if err != nil {
return xerrors.Errorf("getting local storage config: %w", err)
}
for _, path := range cfg.StoragePaths {
err := st.OpenPath(path.Path)
err := st.OpenPath(ctx, path.Path)
if err != nil {
return xerrors.Errorf("opening path %s: %w", path.Path, err)
}
@ -148,23 +166,25 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s
continue
}
for _, p := range st.paths {
p.lk.Lock()
s, ok := p.sectors[sid]
p.lk.Unlock()
si, err := st.index.StorageFindSector(ctx, sid, fileType)
if err != nil {
log.Warnf("finding existing sector %d(t:%d) failed: %+v", sid, fileType, err)
continue
}
for _, info := range si {
p, ok := st.paths[info.ID]
if !ok {
continue
}
if s&fileType == 0 {
continue
}
if p.local == "" {
if p.local == "" { // TODO: can that even be the case?
continue
}
spath := filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
sectorutil.SetPathByType(&out, fileType, spath)
sectorutil.SetPathByType(&storageIDs, fileType, string(p.meta.ID))
sectorutil.SetPathByType(&storageIDs, fileType, string(info.ID))
existing ^= fileType
}
@ -175,27 +195,37 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s
continue
}
sis, err := st.index.StorageBestAlloc(ctx, fileType, sealing)
if err != nil {
st.localLk.RUnlock()
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err)
}
var best string
var bestID ID
for _, p := range st.paths {
if sealing && !p.meta.CanSeal {
continue
}
if !sealing && !p.meta.CanStore {
for _, si := range sis {
p, ok := st.paths[si.ID]
if !ok {
continue
}
p.lk.Lock()
p.sectors[sid] |= fileType
p.lk.Unlock()
if p.local == "" { // TODO: can that even be the case?
continue
}
if sealing && !si.CanSeal {
continue
}
if !sealing && !si.CanStore {
continue
}
// TODO: Check free space
// TODO: Calc weights
best = filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
bestID = p.meta.ID
break // todo: the first path won't always be the best
bestID = si.ID
}
if best == "" {
@ -211,88 +241,41 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s
return out, storageIDs, st.localLk.RUnlock, nil
}
func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageMeta, error) {
st.localLk.RLock()
defer st.localLk.RUnlock()
var out []StorageMeta
for _, p := range st.paths {
if sealing && !p.meta.CanSeal {
log.Debugf("alloc: not considering %s; can't seal", p.meta.ID)
continue
}
if !sealing && !p.meta.CanStore {
log.Debugf("alloc: not considering %s; can't store", p.meta.ID)
continue
}
// TODO: filter out of space
out = append(out, p.meta)
}
if len(out) == 0 {
return nil, xerrors.New("no good path found")
}
// todo: sort by some kind of preference
return out, nil
}
func (st *Local) FindSector(id abi.SectorID, typ sectorbuilder.SectorFileType) ([]StorageMeta, error) {
st.localLk.RLock()
defer st.localLk.RUnlock()
var out []StorageMeta
for _, p := range st.paths {
p.lk.Lock()
t := p.sectors[id]
if t|typ == 0 {
continue
}
p.lk.Unlock()
out = append(out, p.meta)
}
if len(out) == 0 {
return nil, xerrors.Errorf("sector %s/s-t0%d-%d not found", typ, id.Miner, id.Number)
}
return out, nil
}
func (st *Local) Local() []StoragePath {
func (st *Local) Local(ctx context.Context) ([]StoragePath, error) {
st.localLk.RLock()
defer st.localLk.RUnlock()
var out []StoragePath
for _, p := range st.paths {
for id, p := range st.paths {
if p.local == "" {
continue
}
si, err := st.index.StorageInfo(ctx, id)
if err != nil {
return nil, xerrors.Errorf("get storage info for %s: %w", id, err)
}
out = append(out, StoragePath{
ID: p.meta.ID,
Weight: p.meta.Weight,
ID: id,
Weight: si.Weight,
LocalPath: p.local,
CanSeal: p.meta.CanSeal,
CanStore: p.meta.CanStore,
CanSeal: si.CanSeal,
CanStore: si.CanStore,
})
}
return out
return out, nil
}
func (st *Local) List(id ID) map[abi.SectorID]sectorbuilder.SectorFileType {
out := map[abi.SectorID]sectorbuilder.SectorFileType{}
for _, p := range st.paths {
if p.meta.ID != id { // TODO: not very efficient
continue
}
func (st *Local) FsStat(id ID) (FsStat, error) {
st.localLk.RLock()
defer st.localLk.RUnlock()
for id, fileType := range p.sectors {
out[id] |= fileType
}
p, ok := st.paths[id]
if !ok {
return FsStat{}, xerrors.Errorf("fsstat: path not found")
}
return out
return Stat(p.local)
}

View File

@ -84,7 +84,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
}
sort.Slice(si, func(i, j int) bool {
return si[i].Cost < si[j].Cost
return si[i].Weight < si[j].Weight
})
apaths, ids, done, err := r.local.AcquireSector(ctx, s, 0, fileType, sealing)
@ -163,14 +163,6 @@ func (r *Remote) fetch(url, outname string) error {
}
func (r *Remote) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageMeta, error) {
panic("todo")
}
func (r *Remote) FindSector(id abi.SectorID, typ sectorbuilder.SectorFileType) ([]StorageMeta, error) {
panic("todo")
}
func mergeDone(a func(), b func()) func() {
return func() {
a()