Merge pull request #2976 from yaohcn/space-check
add space check in StorageFindSector
This commit is contained in:
commit
90b5e47fa5
@ -257,7 +257,7 @@ type StorageMinerStruct struct {
|
|||||||
StorageAttach func(context.Context, stores.StorageInfo, fsutil.FsStat) error `perm:"admin"`
|
StorageAttach func(context.Context, stores.StorageInfo, fsutil.FsStat) error `perm:"admin"`
|
||||||
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType, bool) error `perm:"admin"`
|
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType, bool) error `perm:"admin"`
|
||||||
StorageDropSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType) error `perm:"admin"`
|
StorageDropSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType) error `perm:"admin"`
|
||||||
StorageFindSector func(context.Context, abi.SectorID, stores.SectorFileType, bool) ([]stores.SectorStorageInfo, error) `perm:"admin"`
|
StorageFindSector func(context.Context, abi.SectorID, stores.SectorFileType, abi.RegisteredSealProof, bool) ([]stores.SectorStorageInfo, error) `perm:"admin"`
|
||||||
StorageInfo func(context.Context, stores.ID) (stores.StorageInfo, error) `perm:"admin"`
|
StorageInfo func(context.Context, stores.ID) (stores.StorageInfo, error) `perm:"admin"`
|
||||||
StorageBestAlloc func(ctx context.Context, allocate stores.SectorFileType, spt abi.RegisteredSealProof, sealing stores.PathType) ([]stores.StorageInfo, error) `perm:"admin"`
|
StorageBestAlloc func(ctx context.Context, allocate stores.SectorFileType, spt abi.RegisteredSealProof, sealing stores.PathType) ([]stores.StorageInfo, error) `perm:"admin"`
|
||||||
StorageReportHealth func(ctx context.Context, id stores.ID, report stores.HealthReport) error `perm:"admin"`
|
StorageReportHealth func(ctx context.Context, id stores.ID, report stores.HealthReport) error `perm:"admin"`
|
||||||
@ -980,8 +980,8 @@ func (c *StorageMinerStruct) StorageDropSector(ctx context.Context, storageId st
|
|||||||
return c.Internal.StorageDropSector(ctx, storageId, s, ft)
|
return c.Internal.StorageDropSector(ctx, storageId, s, ft)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types stores.SectorFileType, allowFetch bool) ([]stores.SectorStorageInfo, error) {
|
func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types stores.SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]stores.SectorStorageInfo, error) {
|
||||||
return c.Internal.StorageFindSector(ctx, si, types, allowFetch)
|
return c.Internal.StorageFindSector(ctx, si, types, spt, allowFetch)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *StorageMinerStruct) StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) {
|
func (c *StorageMinerStruct) StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) {
|
||||||
|
@ -294,17 +294,17 @@ var storageFindCmd = &cli.Command{
|
|||||||
Number: abi.SectorNumber(snum),
|
Number: abi.SectorNumber(snum),
|
||||||
}
|
}
|
||||||
|
|
||||||
u, err := nodeApi.StorageFindSector(ctx, sid, stores.FTUnsealed, false)
|
u, err := nodeApi.StorageFindSector(ctx, sid, stores.FTUnsealed, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("finding unsealed: %w", err)
|
return xerrors.Errorf("finding unsealed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s, err := nodeApi.StorageFindSector(ctx, sid, stores.FTSealed, false)
|
s, err := nodeApi.StorageFindSector(ctx, sid, stores.FTSealed, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("finding sealed: %w", err)
|
return xerrors.Errorf("finding sealed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := nodeApi.StorageFindSector(ctx, sid, stores.FTCache, false)
|
c, err := nodeApi.StorageFindSector(ctx, sid, stores.FTCache, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("finding cache: %w", err)
|
return xerrors.Errorf("finding cache: %w", err)
|
||||||
}
|
}
|
||||||
|
7
extern/sector-storage/manager.go
vendored
7
extern/sector-storage/manager.go
vendored
@ -206,7 +206,8 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
|||||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false)
|
// passing 0 spt because we only need it when allowFetch is true
|
||||||
|
best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
|
return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
|
||||||
}
|
}
|
||||||
@ -403,7 +404,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
|
|||||||
|
|
||||||
unsealed := stores.FTUnsealed
|
unsealed := stores.FTUnsealed
|
||||||
{
|
{
|
||||||
unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false)
|
unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("finding unsealed sector: %w", err)
|
return xerrors.Errorf("finding unsealed sector: %w", err)
|
||||||
}
|
}
|
||||||
@ -459,7 +460,7 @@ func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error {
|
|||||||
|
|
||||||
unsealed := stores.FTUnsealed
|
unsealed := stores.FTUnsealed
|
||||||
{
|
{
|
||||||
unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false)
|
unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("finding unsealed sector: %w", err)
|
return xerrors.Errorf("finding unsealed sector: %w", err)
|
||||||
}
|
}
|
||||||
|
2
extern/sector-storage/selector_existing.go
vendored
2
extern/sector-storage/selector_existing.go
vendored
@ -46,7 +46,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
|
|||||||
have[path.ID] = struct{}{}
|
have[path.ID] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, s.allowFetch)
|
best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, spt, s.allowFetch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("finding best storage: %w", err)
|
return false, xerrors.Errorf("finding best storage: %w", err)
|
||||||
}
|
}
|
||||||
|
28
extern/sector-storage/stores/index.go
vendored
28
extern/sector-storage/stores/index.go
vendored
@ -54,7 +54,7 @@ type SectorIndex interface { // part of storage-miner api
|
|||||||
|
|
||||||
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType, primary bool) error
|
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType, primary bool) error
|
||||||
StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
|
StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
|
||||||
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error)
|
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]SectorStorageInfo, error)
|
||||||
|
|
||||||
StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredSealProof, pathType PathType) ([]StorageInfo, error)
|
StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredSealProof, pathType PathType) ([]StorageInfo, error)
|
||||||
|
|
||||||
@ -245,7 +245,7 @@ func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.Secto
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error) {
|
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]SectorStorageInfo, error) {
|
||||||
i.lk.RLock()
|
i.lk.RLock()
|
||||||
defer i.lk.RUnlock()
|
defer i.lk.RUnlock()
|
||||||
|
|
||||||
@ -296,7 +296,31 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector
|
|||||||
}
|
}
|
||||||
|
|
||||||
if allowFetch {
|
if allowFetch {
|
||||||
|
spaceReq, err := ft.SealSpaceUse(spt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("estimating required space: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
for id, st := range i.stores {
|
for id, st := range i.stores {
|
||||||
|
if !st.info.CanSeal {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if spaceReq > uint64(st.fsi.Available) {
|
||||||
|
log.Debugf("not selecting on %s, out of space (available: %d, need: %d)", st.info.ID, st.fsi.Available, spaceReq)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if time.Since(st.lastHeartbeat) > SkippedHeartbeatThresh {
|
||||||
|
log.Debugf("not selecting on %s, didn't receive heartbeats for %s", st.info.ID, time.Since(st.lastHeartbeat))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if st.heartbeatErr != nil {
|
||||||
|
log.Debugf("not selecting on %s, heartbeat error: %s", st.info.ID, st.heartbeatErr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if _, ok := storageIDs[id]; ok {
|
if _, ok := storageIDs[id]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
6
extern/sector-storage/stores/local.go
vendored
6
extern/sector-storage/stores/local.go
vendored
@ -334,7 +334,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
si, err := st.index.StorageFindSector(ctx, sid, fileType, false)
|
si, err := st.index.StorageFindSector(ctx, sid, fileType, spt, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("finding existing sector %d(t:%d) failed: %+v", sid, fileType, err)
|
log.Warnf("finding existing sector %d(t:%d) failed: %+v", sid, fileType, err)
|
||||||
continue
|
continue
|
||||||
@ -441,7 +441,7 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileTyp
|
|||||||
return xerrors.New("delete expects one file type")
|
return xerrors.New("delete expects one file type")
|
||||||
}
|
}
|
||||||
|
|
||||||
si, err := st.index.StorageFindSector(ctx, sid, typ, false)
|
si, err := st.index.StorageFindSector(ctx, sid, typ, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err)
|
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err)
|
||||||
}
|
}
|
||||||
@ -464,7 +464,7 @@ func (st *Local) RemoveCopies(ctx context.Context, sid abi.SectorID, typ SectorF
|
|||||||
return xerrors.New("delete expects one file type")
|
return xerrors.New("delete expects one file type")
|
||||||
}
|
}
|
||||||
|
|
||||||
si, err := st.index.StorageFindSector(ctx, sid, typ, false)
|
si, err := st.index.StorageFindSector(ctx, sid, typ, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err)
|
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err)
|
||||||
}
|
}
|
||||||
|
4
extern/sector-storage/stores/remote.go
vendored
4
extern/sector-storage/stores/remote.go
vendored
@ -170,7 +170,7 @@ func tempFetchDest(spath string, create bool) (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType SectorFileType, dest string) (string, error) {
|
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType SectorFileType, dest string) (string, error) {
|
||||||
si, err := r.index.StorageFindSector(ctx, s, fileType, false)
|
si, err := r.index.StorageFindSector(ctx, s, fileType, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -300,7 +300,7 @@ func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileTyp
|
|||||||
return xerrors.Errorf("remove from local: %w", err)
|
return xerrors.Errorf("remove from local: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
si, err := r.index.StorageFindSector(ctx, sid, typ, false)
|
si, err := r.index.StorageFindSector(ctx, sid, typ, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err)
|
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user