workers: Report sectors paths after worker creates them

This commit is contained in:
Łukasz Magiera 2020-03-13 17:54:55 +01:00
parent 2c7f579838
commit 96730bae75
10 changed files with 62 additions and 42 deletions

View File

@ -167,7 +167,7 @@ var runCmd = &cli.Command{
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader())
workerApi := &worker{
LocalWorker: advmgr.NewLocalWorker(act, spt, remote, localStore),
LocalWorker: advmgr.NewLocalWorker(act, spt, remote, localStore, stores.NewIndex()),
}
mux := mux.NewRouter()

View File

@ -258,6 +258,7 @@ func Online() Option {
// Storage miner
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner },
Override(new(*stores.Index), stores.NewIndex()),
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig),
Override(new(stores.LocalStorage), From(new(repo.LockedRepo))),
Override(new(advmgr.SectorIDCounter), modules.SectorIDCounter),

View File

@ -39,6 +39,7 @@ type StorageMinerAPI struct {
BlockMiner *miner.Miner
Full api.FullNode
StorageMgr *advmgr.Manager `optional:"true"`
*stores.Index
}
func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
@ -152,18 +153,6 @@ func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error
panic("todo register ")
}
func (sm *StorageMinerAPI) StorageAttach(ctx context.Context, si stores.StorageInfo) error {
panic("implement me")
}
func (sm *StorageMinerAPI) StorageDeclareSector(ctx context.Context, storageId stores.ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error {
panic("implement me")
}
func (sm *StorageMinerAPI) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) {
panic("implement me")
}
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
fi, err := os.Open(path)
if err != nil {

View File

@ -8,22 +8,25 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
storage2 "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
)
var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache}
type LocalWorker struct {
scfg *sectorbuilder.Config
storage stores.Store
localStore *stores.Local
sindex stores.SectorIndex
}
func NewLocalWorker(ma address.Address, spt abi.RegisteredProof, store stores.Store, local *stores.Local) *LocalWorker {
func NewLocalWorker(ma address.Address, spt abi.RegisteredProof, store stores.Store, local *stores.Local, sindex stores.SectorIndex) *LocalWorker {
ppt, err := spt.RegisteredPoStProof()
if err != nil {
panic(err)
@ -36,6 +39,7 @@ func NewLocalWorker(ma address.Address, spt abi.RegisteredProof, store stores.St
},
storage: store,
localStore: local,
sindex: sindex,
}
}
@ -49,10 +53,31 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, id abi.Sect
return sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("get miner ID: %w", err)
}
return l.w.storage.AcquireSector(ctx, abi.SectorID{
sector := abi.SectorID{
Miner: abi.ActorID(mid),
Number: id,
}, existing, allocate, sealing)
}
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing)
if err != nil {
return sectorbuilder.SectorPaths{}, nil, err
}
return paths, func() {
done()
for _, fileType := range pathTypes {
if fileType&allocate == 0 {
continue
}
sid := sectorutil.PathByType(storageIDs, fileType)
if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType); err != nil {
log.Errorf("declare sector error: %+v", err)
}
}
}, nil
}
func (l *LocalWorker) sb() (sectorbuilder.Basic, error) {

View File

@ -20,8 +20,10 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorNumbe
return sectorbuilder.SectorPaths{}, nil, xerrors.New("read-only storage")
}
return l.stor.AcquireSector(ctx, abi.SectorID{
p, _, done, err := l.stor.AcquireSector(ctx, abi.SectorID{
Miner: l.miner,
Number: id,
}, existing, allocate, sealing)
return p, done, err
}

View File

@ -44,7 +44,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
if err != nil {
return
}
paths, done, err := handler.Store.AcquireSector(r.Context(), id, ft, 0, false)
paths, _, done, err := handler.Store.AcquireSector(r.Context(), id, ft, 0, false)
if err != nil {
return
}

View File

@ -75,16 +75,23 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se
i.lk.Lock()
defer i.lk.Unlock()
d := decl{s, ft}
for _, sid := range i.sectors[d] {
if sid == storageId {
log.Warnf("sector %v redeclared in %s", storageId)
return nil
for _, fileType := range pathTypes {
if fileType&ft == 0 {
continue
}
d := decl{s, fileType}
for _, sid := range i.sectors[d] {
if sid == storageId {
log.Warnf("sector %v redeclared in %s", storageId)
return nil
}
}
i.sectors[d] = append(i.sectors[d], storageId)
}
i.sectors[d] = append(i.sectors[d], storageId)
return nil
}

View File

@ -8,5 +8,5 @@ import (
)
type Store interface {
AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error)
AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error)
}

View File

@ -133,12 +133,7 @@ func (st *Local) open() error {
return nil
}
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
out, _, done, err := st.acquireSector(ctx, sid, existing, allocate, sealing)
return out, done, err
}
func (st *Local) acquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) {
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) {
if existing|allocate != existing^allocate {
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
}

View File

@ -37,17 +37,17 @@ func NewRemote(local *Local, remote SectorIndex, auth http.Header) *Remote {
}
}
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) {
if existing|allocate != existing^allocate {
return sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
}
r.fetchLk.Lock()
defer r.fetchLk.Unlock()
paths, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing)
paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing)
if err != nil {
return sectorbuilder.SectorPaths{}, nil, err
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, err
}
for _, fileType := range pathTypes {
@ -62,18 +62,19 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
ap, storageID, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing)
if err != nil {
done()
return sectorbuilder.SectorPaths{}, nil, err
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, err
}
done = mergeDone(done, rdone)
sectorutil.SetPathByType(&paths, fileType, ap)
sectorutil.SetPathByType(&stores, fileType, string(storageID))
if err := r.remote.StorageDeclareSector(ctx, storageID, s, fileType); err != nil {
log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err)
}
}
return paths, done, nil
return paths, stores, done, nil
}
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, func(), error) {
@ -86,7 +87,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
return si[i].Cost < si[j].Cost
})
apaths, ids, done, err := r.local.acquireSector(ctx, s, 0, fileType, sealing)
apaths, ids, done, err := r.local.AcquireSector(ctx, s, 0, fileType, sealing)
if err != nil {
return "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
}