From 6b0f607f4b2f9f2b3f270d0ca7ed4d415e988793 Mon Sep 17 00:00:00 2001 From: yaohcn Date: Tue, 11 Aug 2020 15:27:03 +0800 Subject: [PATCH] add space check in StorageFindSector --- api/apistruct/struct.go | 6 ++--- cmd/lotus-storage-miner/storage.go | 6 ++--- extern/sector-storage/manager.go | 7 +++--- extern/sector-storage/selector_existing.go | 2 +- extern/sector-storage/stores/index.go | 28 ++++++++++++++++++++-- extern/sector-storage/stores/local.go | 6 ++--- extern/sector-storage/stores/remote.go | 4 ++-- 7 files changed, 42 insertions(+), 17 deletions(-) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index bc3b72a17..27d136c78 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -257,7 +257,7 @@ type StorageMinerStruct struct { StorageAttach func(context.Context, stores.StorageInfo, fsutil.FsStat) error `perm:"admin"` StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType, bool) error `perm:"admin"` StorageDropSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType) error `perm:"admin"` - StorageFindSector func(context.Context, abi.SectorID, stores.SectorFileType, bool) ([]stores.SectorStorageInfo, error) `perm:"admin"` + StorageFindSector func(context.Context, abi.SectorID, stores.SectorFileType, abi.RegisteredSealProof, bool) ([]stores.SectorStorageInfo, error) `perm:"admin"` StorageInfo func(context.Context, stores.ID) (stores.StorageInfo, error) `perm:"admin"` StorageBestAlloc func(ctx context.Context, allocate stores.SectorFileType, spt abi.RegisteredSealProof, sealing stores.PathType) ([]stores.StorageInfo, error) `perm:"admin"` StorageReportHealth func(ctx context.Context, id stores.ID, report stores.HealthReport) error `perm:"admin"` @@ -980,8 +980,8 @@ func (c *StorageMinerStruct) StorageDropSector(ctx context.Context, storageId st return c.Internal.StorageDropSector(ctx, storageId, s, ft) } -func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types stores.SectorFileType, allowFetch bool) ([]stores.SectorStorageInfo, error) { - return c.Internal.StorageFindSector(ctx, si, types, allowFetch) +func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types stores.SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]stores.SectorStorageInfo, error) { + return c.Internal.StorageFindSector(ctx, si, types, spt, allowFetch) } func (c *StorageMinerStruct) StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) { diff --git a/cmd/lotus-storage-miner/storage.go b/cmd/lotus-storage-miner/storage.go index 26a36431e..712962c9b 100644 --- a/cmd/lotus-storage-miner/storage.go +++ b/cmd/lotus-storage-miner/storage.go @@ -294,17 +294,17 @@ var storageFindCmd = &cli.Command{ Number: abi.SectorNumber(snum), } - u, err := nodeApi.StorageFindSector(ctx, sid, stores.FTUnsealed, false) + u, err := nodeApi.StorageFindSector(ctx, sid, stores.FTUnsealed, 0, false) if err != nil { return xerrors.Errorf("finding unsealed: %w", err) } - s, err := nodeApi.StorageFindSector(ctx, sid, stores.FTSealed, false) + s, err := nodeApi.StorageFindSector(ctx, sid, stores.FTSealed, 0, false) if err != nil { return xerrors.Errorf("finding sealed: %w", err) } - c, err := nodeApi.StorageFindSector(ctx, sid, stores.FTCache, false) + c, err := nodeApi.StorageFindSector(ctx, sid, stores.FTCache, 0, false) if err != nil { return xerrors.Errorf("finding cache: %w", err) } diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 303df2169..e9fa1ccd4 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -206,7 +206,8 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect return xerrors.Errorf("acquiring sector lock: %w", err) } - best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false) + // passing 0 spt because we only need it when allowFetch is true + best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false) if err != nil { return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err) } @@ -403,7 +404,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU unsealed := stores.FTUnsealed { - unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false) + unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false) if err != nil { return xerrors.Errorf("finding unsealed sector: %w", err) } @@ -459,7 +460,7 @@ func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error { unsealed := stores.FTUnsealed { - unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false) + unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false) if err != nil { return xerrors.Errorf("finding unsealed sector: %w", err) } diff --git a/extern/sector-storage/selector_existing.go b/extern/sector-storage/selector_existing.go index a11c39007..0a324ac5c 100644 --- a/extern/sector-storage/selector_existing.go +++ b/extern/sector-storage/selector_existing.go @@ -46,7 +46,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt have[path.ID] = struct{}{} } - best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, s.allowFetch) + best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, spt, s.allowFetch) if err != nil { return false, xerrors.Errorf("finding best storage: %w", err) } diff --git a/extern/sector-storage/stores/index.go b/extern/sector-storage/stores/index.go index acad2abaa..94858d3e8 100644 --- a/extern/sector-storage/stores/index.go +++ b/extern/sector-storage/stores/index.go @@ -54,7 +54,7 @@ type SectorIndex interface { // part of storage-miner api StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType, primary bool) error StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error - StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error) + StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]SectorStorageInfo, error) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredSealProof, pathType PathType) ([]StorageInfo, error) @@ -245,7 +245,7 @@ func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.Secto return nil } -func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error) { +func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]SectorStorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -296,7 +296,31 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector } if allowFetch { + spaceReq, err := ft.SealSpaceUse(spt) + if err != nil { + return nil, xerrors.Errorf("estimating required space: %w", err) + } + for id, st := range i.stores { + if !st.info.CanSeal { + continue + } + + if spaceReq > uint64(st.fsi.Available) { + log.Debugf("not selecting on %s, out of space (available: %d, need: %d)", st.info.ID, st.fsi.Available, spaceReq) + continue + } + + if time.Since(st.lastHeartbeat) > SkippedHeartbeatThresh { + log.Debugf("not selecting on %s, didn't receive heartbeats for %s", st.info.ID, time.Since(st.lastHeartbeat)) + continue + } + + if st.heartbeatErr != nil { + log.Debugf("not selecting on %s, heartbeat error: %s", st.info.ID, st.heartbeatErr) + continue + } + if _, ok := storageIDs[id]; ok { continue } diff --git a/extern/sector-storage/stores/local.go b/extern/sector-storage/stores/local.go index 9efab6480..4895e161b 100644 --- a/extern/sector-storage/stores/local.go +++ b/extern/sector-storage/stores/local.go @@ -334,7 +334,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re continue } - si, err := st.index.StorageFindSector(ctx, sid, fileType, false) + si, err := st.index.StorageFindSector(ctx, sid, fileType, spt, false) if err != nil { log.Warnf("finding existing sector %d(t:%d) failed: %+v", sid, fileType, err) continue @@ -441,7 +441,7 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileTyp return xerrors.New("delete expects one file type") } - si, err := st.index.StorageFindSector(ctx, sid, typ, false) + si, err := st.index.StorageFindSector(ctx, sid, typ, 0, false) if err != nil { return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err) } @@ -464,7 +464,7 @@ func (st *Local) RemoveCopies(ctx context.Context, sid abi.SectorID, typ SectorF return xerrors.New("delete expects one file type") } - si, err := st.index.StorageFindSector(ctx, sid, typ, false) + si, err := st.index.StorageFindSector(ctx, sid, typ, 0, false) if err != nil { return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err) } diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index 93dc2ca58..12587a86e 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -170,7 +170,7 @@ func tempFetchDest(spath string, create bool) (string, error) { } func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType SectorFileType, dest string) (string, error) { - si, err := r.index.StorageFindSector(ctx, s, fileType, false) + si, err := r.index.StorageFindSector(ctx, s, fileType, 0, false) if err != nil { return "", err } @@ -300,7 +300,7 @@ func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileTyp return xerrors.Errorf("remove from local: %w", err) } - si, err := r.index.StorageFindSector(ctx, sid, typ, false) + si, err := r.index.StorageFindSector(ctx, sid, typ, 0, false) if err != nil { return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err) }