workers: Fix moving storage around in later steps

This commit is contained in:
Łukasz Magiera 2020-03-20 01:55:49 +01:00
parent e0c15b24fc
commit 076cc428af
6 changed files with 41 additions and 14 deletions

View File

@ -186,7 +186,7 @@ type StorageMinerStruct struct {
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
StorageAttach func(context.Context, stores.StorageInfo, stores.FsStat) error `perm:"admin"`
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"`
StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) `perm:"admin"`
StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType, bool) ([]stores.StorageInfo, error) `perm:"admin"`
StorageInfo func(context.Context, stores.ID) (stores.StorageInfo, error) `perm:"admin"`
StorageBestAlloc func(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]stores.StorageInfo, error) `perm:"admin"`
@ -664,8 +664,8 @@ func (c *StorageMinerStruct) StorageDeclareSector(ctx context.Context, storageId
return c.Internal.StorageDeclareSector(ctx, storageId, s, ft)
}
func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) {
return c.Internal.StorageFindSector(ctx, si, types)
func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType, allowFetch bool) ([]stores.StorageInfo, error) {
return c.Internal.StorageFindSector(ctx, si, types, allowFetch)
}
func (c *StorageMinerStruct) StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) {

View File

@ -208,17 +208,17 @@ var storageFindCmd = &cli.Command{
Number: abi.SectorNumber(snum),
}
u, err := nodeApi.StorageFindSector(ctx, sid, sectorbuilder.FTUnsealed)
u, err := nodeApi.StorageFindSector(ctx, sid, sectorbuilder.FTUnsealed, false)
if err != nil {
return xerrors.Errorf("finding unsealed: %w", err)
}
s, err := nodeApi.StorageFindSector(ctx, sid, sectorbuilder.FTSealed)
s, err := nodeApi.StorageFindSector(ctx, sid, sectorbuilder.FTSealed, false)
if err != nil {
return xerrors.Errorf("finding sealed: %w", err)
}
c, err := nodeApi.StorageFindSector(ctx, sid, sectorbuilder.FTCache)
c, err := nodeApi.StorageFindSector(ctx, sid, sectorbuilder.FTCache, false)
if err != nil {
return xerrors.Errorf("finding cache: %w", err)
}

View File

@ -188,7 +188,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
if len(existingPieces) == 0 { // new
best, err = m.index.StorageBestAlloc(ctx, sectorbuilder.FTUnsealed, true)
} else { // append to existing
best, err = m.index.StorageFindSector(ctx, sector, sectorbuilder.FTUnsealed)
best, err = m.index.StorageFindSector(ctx, sector, sectorbuilder.FTUnsealed, false)
}
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
@ -227,7 +227,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (cids storage2.SectorCids, err error) {
// TODO: allow workers to fetch the sectors
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed)
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
if err != nil {
return storage2.SectorCids{}, xerrors.Errorf("finding path for sector sealing: %w", err)
}
@ -243,7 +243,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
}
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (output storage2.Commit1Out, err error) {
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed)
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
@ -276,7 +276,7 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
}
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed)
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed, true)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}

View File

@ -35,7 +35,7 @@ type SectorIndex interface { // part of storage-miner api
// TODO: StorageUpdateStats(FsStat)
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error
StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error)
StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error)
StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageInfo, error)
}
@ -137,7 +137,7 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se
return nil
}
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType) ([]StorageInfo, error) {
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error) {
i.lk.RLock()
defer i.lk.RUnlock()
@ -182,6 +182,33 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sector
})
}
if allowFetch {
for id, st := range i.stores {
if _, ok := storageIDs[id]; ok {
continue
}
urls := make([]string, len(st.info.URLs))
for k, u := range st.info.URLs {
rl, err := url.Parse(u)
if err != nil {
return nil, xerrors.Errorf("failed to parse url: %w", err)
}
rl.Path = gopath.Join(rl.Path, ft.String(), sectorutil.SectorName(s))
urls[k] = rl.String()
}
out = append(out, StorageInfo{
ID: id,
URLs: urls,
Weight: st.info.Weight * 0, // TODO: something better than just '0'
CanSeal: st.info.CanSeal,
CanStore: st.info.CanStore,
})
}
}
return out, nil
}

View File

@ -166,7 +166,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s
continue
}
si, err := st.index.StorageFindSector(ctx, sid, fileType)
si, err := st.index.StorageFindSector(ctx, sid, fileType, false)
if err != nil {
log.Warnf("finding existing sector %d(t:%d) failed: %+v", sid, fileType, err)
continue

View File

@ -78,7 +78,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
}
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, func(), error) {
si, err := r.index.StorageFindSector(ctx, s, fileType)
si, err := r.index.StorageFindSector(ctx, s, fileType, false)
if err != nil {
return "", "", nil, err
}