diff --git a/localworker.go b/localworker.go index a92f01a89..969007d93 100644 --- a/localworker.go +++ b/localworker.go @@ -58,7 +58,7 @@ type localWorkerPathProvider struct { } func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) { - paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing) + paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing) if err != nil { return stores.SectorPaths{}, nil, err } @@ -163,7 +163,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e return xerrors.Errorf("removing unsealed data: %w", err) } - if err := l.storage.MoveStorage(ctx, sector, stores.FTSealed|stores.FTCache); err != nil { + if err := l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, stores.FTSealed|stores.FTCache); err != nil { return xerrors.Errorf("moving sealed data to storage: %w", err) } diff --git a/resources.go b/resources.go index 87058e80a..23dcc2085 100644 --- a/resources.go +++ b/resources.go @@ -4,21 +4,8 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/sector-storage/sealtasks" - "github.com/filecoin-project/sector-storage/stores" ) -var FSOverheadSeal = map[stores.SectorFileType]int{ // 10x overheads - stores.FTUnsealed: 10, - stores.FTSealed: 10, - stores.FTCache: 70, // TODO: confirm for 32G -} - -var FsOverheadFinalized = map[stores.SectorFileType]int{ - stores.FTUnsealed: 10, - stores.FTSealed: 10, - stores.FTCache: 2, -} - type Resources struct { MinMemory uint64 // What Must be in RAM for decent perf MaxMemory uint64 // Memory required (swap + ram) diff --git a/roprov.go b/roprov.go index e6ec1e8f2..172cf7cf8 100644 --- a/roprov.go +++ b/roprov.go @@ -12,6 +12,7 @@ import ( type readonlyProvider struct { stor *stores.Local + spt abi.RegisteredProof } func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) { @@ -19,7 +20,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e return stores.SectorPaths{}, nil, xerrors.New("read-only storage") } - p, _, done, err := l.stor.AcquireSector(ctx, id, existing, allocate, sealing) + p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, sealing) return p, done, err } diff --git a/sched.go b/sched.go index 019febda4..c48440757 100644 --- a/sched.go +++ b/sched.go @@ -20,7 +20,7 @@ const mib = 1 << 20 type WorkerAction func(ctx context.Context, w Worker) error type WorkerSelector interface { - Ok(ctx context.Context, task sealtasks.TaskType, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task + Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredProof, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) // true if a is preferred over b } @@ -178,7 +178,7 @@ func (sh *scheduler) onWorkerFreed(wid WorkerID) { for i := 0; i < sh.schedQueue.Len(); i++ { req := (*sh.schedQueue)[i] - ok, err := req.sel.Ok(req.ctx, req.taskType, w) + ok, err := req.sel.Ok(req.ctx, req.taskType, sh.spt, w) if err != nil { log.Errorf("onWorkerFreed req.sel.Ok error: %+v", err) continue @@ -212,7 +212,7 @@ func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) { needRes := ResourceTable[req.taskType][sh.spt] for wid, worker := range sh.workers { - ok, err := req.sel.Ok(req.ctx, req.taskType, worker) + ok, err := req.sel.Ok(req.ctx, req.taskType, sh.spt, worker) if err != nil { return false, err } diff --git a/selector_alloc.go b/selector_alloc.go index c7d06a7bc..0a7850424 100644 --- a/selector_alloc.go +++ b/selector_alloc.go @@ -5,26 +5,25 @@ import ( "golang.org/x/xerrors" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/sector-storage/sealtasks" "github.com/filecoin-project/sector-storage/stores" ) type allocSelector struct { - best []stores.StorageInfo + index stores.SectorIndex + alloc stores.SectorFileType } func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType) (*allocSelector, error) { - best, err := index.StorageBestAlloc(ctx, alloc, true) - if err != nil { - return nil, err - } - return &allocSelector{ - best: best, + index: index, + alloc: alloc, }, nil } -func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) { +func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredProof, whnd *workerHandle) (bool, error) { tasks, err := whnd.w.TaskTypes(ctx) if err != nil { return false, xerrors.Errorf("getting supported worker task types: %w", err) @@ -43,7 +42,12 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *w have[path.ID] = struct{}{} } - for _, info := range s.best { + best, err := s.index.StorageBestAlloc(ctx, s.alloc, spt, true) + if err != nil { + return false, xerrors.Errorf("finding best alloc storage: %w", err) + } + + for _, info := range best { if _, ok := have[info.ID]; ok { return true, nil } diff --git a/selector_existing.go b/selector_existing.go index 46dd3278e..14e6dbefd 100644 --- a/selector_existing.go +++ b/selector_existing.go @@ -5,9 +5,10 @@ import ( "golang.org/x/xerrors" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/sector-storage/sealtasks" "github.com/filecoin-project/sector-storage/stores" - "github.com/filecoin-project/specs-actors/actors/abi" ) type existingSelector struct { @@ -25,7 +26,7 @@ func newExistingSelector(ctx context.Context, index stores.SectorIndex, sector a }, nil } -func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) { +func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredProof, whnd *workerHandle) (bool, error) { tasks, err := whnd.w.TaskTypes(ctx) if err != nil { return false, xerrors.Errorf("getting supported worker task types: %w", err) diff --git a/selector_task.go b/selector_task.go index 3298c9e5d..d2cf73476 100644 --- a/selector_task.go +++ b/selector_task.go @@ -5,6 +5,8 @@ import ( "golang.org/x/xerrors" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/sector-storage/sealtasks" "github.com/filecoin-project/sector-storage/stores" ) @@ -17,7 +19,7 @@ func newTaskSelector() *taskSelector { return &taskSelector{} } -func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) { +func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredProof, whnd *workerHandle) (bool, error) { tasks, err := whnd.w.TaskTypes(ctx) if err != nil { return false, xerrors.Errorf("getting supported worker task types: %w", err) diff --git a/stores/filetype.go b/stores/filetype.go index 784b5b71e..1810054d8 100644 --- a/stores/filetype.go +++ b/stores/filetype.go @@ -17,6 +17,18 @@ const ( FTNone SectorFileType = 0 ) +var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads + FTUnsealed: 10, + FTSealed: 10, + FTCache: 70, // TODO: confirm for 32G +} + +var FsOverheadFinalized = map[SectorFileType]int{ + FTUnsealed: 10, + FTSealed: 10, + FTCache: 2, +} + type SectorFileType int func (t SectorFileType) String() string { @@ -36,6 +48,29 @@ func (t SectorFileType) Has(singleType SectorFileType) bool { return t&singleType == singleType } +func (t SectorFileType) SealSpaceUse(spt abi.RegisteredProof) (uint64, error) { + ssize, err := spt.SectorSize() + if err != nil { + return 0, xerrors.Errorf("getting sector size: %w", err) + } + + var need uint64 + for _, pathType := range PathTypes { + if !t.Has(pathType) { + continue + } + + oh, ok := FSOverheadSeal[pathType] + if !ok { + return 0, xerrors.Errorf("no seal overhead info for %s", pathType) + } + + need += uint64(oh) * uint64(ssize) / 10 + } + + return need, nil +} + type SectorPaths struct { Id abi.SectorID diff --git a/stores/http_handler.go b/stores/http_handler.go index b14dac54f..c39ca4510 100644 --- a/stores/http_handler.go +++ b/stores/http_handler.go @@ -68,7 +68,9 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ w.WriteHeader(500) return } - paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, ft, FTNone, false) + + // passing 0 spt because we don't allocate anything + paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false) if err != nil { log.Error("%+v", err) w.WriteHeader(500) diff --git a/stores/index.go b/stores/index.go index f0ff22a01..6659a4422 100644 --- a/stores/index.go +++ b/stores/index.go @@ -14,8 +14,8 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi/big" ) -var HeartBeatInterval = 10 * time.Second -var SkippedHeartbeatThresh = HeartBeatInterval * 5 +var HeartbeatInterval = 10 * time.Second +var SkippedHeartbeatThresh = HeartbeatInterval * 5 // ID identifies sector storage by UUID. One sector storage should map to one // filesystem, local or networked / shared by multiple machines @@ -47,7 +47,7 @@ type SectorIndex interface { // part of storage-miner api StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error) - StorageBestAlloc(ctx context.Context, allocate SectorFileType, sealing bool) ([]StorageInfo, error) + StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, sealing bool) ([]StorageInfo, error) } type Decl struct { @@ -302,12 +302,17 @@ func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) { return *si.info, nil } -func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, sealing bool) ([]StorageInfo, error) { +func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, sealing bool) ([]StorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() var candidates []storageEntry + spaceReq, err := allocate.SealSpaceUse(spt) + if err != nil { + return nil, xerrors.Errorf("estimating required space: %w", err) + } + for _, p := range i.stores { if sealing && !p.info.CanSeal { continue @@ -316,7 +321,20 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s continue } - // TODO: filter out of space + if spaceReq > p.fsi.Available { + log.Debugf("not allocating on %s, out of space (available: %d, need: %d)", p.info.ID, p.fsi.Available, spaceReq) + continue + } + + if time.Since(p.lastHeartbeat) > SkippedHeartbeatThresh { + log.Debugf("not allocating on %s, didn't receive heartbeats for %s", p.info.ID, time.Since(p.lastHeartbeat)) + continue + } + + if p.heartbeatErr != nil { + log.Debugf("not allocating on %s, heartbeat error: %s", p.info.ID, p.heartbeatErr) + continue + } candidates = append(candidates, *p) } diff --git a/stores/interface.go b/stores/interface.go index 556cd4dbf..4a1361904 100644 --- a/stores/interface.go +++ b/stores/interface.go @@ -10,11 +10,11 @@ import ( ) type Store interface { - AcquireSector(ctx context.Context, s abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (paths SectorPaths, stores SectorPaths, done func(), err error) + AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (paths SectorPaths, stores SectorPaths, done func(), err error) Remove(ctx context.Context, s abi.SectorID, types SectorFileType) error // move sectors into storage - MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error + MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error FsStat(ctx context.Context, id ID) (FsStat, error) } diff --git a/stores/local.go b/stores/local.go index 673583c6a..52ba0afbc 100644 --- a/stores/local.go +++ b/stores/local.go @@ -164,7 +164,7 @@ func (st *Local) open(ctx context.Context) error { func (st *Local) reportHealth(ctx context.Context) { // randomize interval by ~10% - interval := (HeartBeatInterval*100_000 + time.Duration(rand.Int63n(10_000))) / 100_000 + interval := (HeartbeatInterval*100_000 + time.Duration(rand.Int63n(10_000))) / 100_000 for { select { @@ -195,7 +195,7 @@ func (st *Local) reportHealth(ctx context.Context) { } } -func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) { +func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) { if existing|allocate != existing^allocate { return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") } @@ -240,7 +240,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing S continue } - sis, err := st.index.StorageBestAlloc(ctx, fileType, sealing) + sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, sealing) if err != nil { st.localLk.RUnlock() return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err) @@ -352,14 +352,14 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileTyp return nil } -func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error { - dest, destIds, sdone, err := st.AcquireSector(ctx, s, FTNone, types, false) +func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error { + dest, destIds, sdone, err := st.AcquireSector(ctx, s, spt, FTNone, types, false) if err != nil { return xerrors.Errorf("acquire dest storage: %w", err) } defer sdone() - src, srcIds, ddone, err := st.AcquireSector(ctx, s, types, FTNone, false) + src, srcIds, ddone, err := st.AcquireSector(ctx, s, spt, types, FTNone, false) if err != nil { return xerrors.Errorf("acquire src storage: %w", err) } diff --git a/stores/remote.go b/stores/remote.go index a750d4841..c5d570ffa 100644 --- a/stores/remote.go +++ b/stores/remote.go @@ -41,7 +41,7 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote { } } -func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) { +func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) { if existing|allocate != existing^allocate { return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") } @@ -73,7 +73,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing Sec r.fetchLk.Unlock() }() - paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing) + paths, stores, done, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, sealing) if err != nil { return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err) } @@ -87,7 +87,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing Sec continue } - ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing) + ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, spt, fileType, sealing) if err != nil { done() return SectorPaths{}, SectorPaths{}, nil, err @@ -111,7 +111,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing Sec return paths, stores, done, nil } -func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType SectorFileType, sealing bool) (string, ID, string, func(), error) { +func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, sealing bool) (string, ID, string, func(), error) { si, err := r.index.StorageFindSector(ctx, s, fileType, false) if err != nil { return "", "", "", nil, err @@ -125,7 +125,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType return si[i].Weight < si[j].Weight }) - apaths, ids, done, err := r.local.AcquireSector(ctx, s, FTNone, fileType, sealing) + apaths, ids, done, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, sealing) if err != nil { return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err) } @@ -203,15 +203,15 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error { } } -func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error { +func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error { // Make sure we have the data local - _, _, ddone, err := r.AcquireSector(ctx, s, types, FTNone, false) + _, _, ddone, err := r.AcquireSector(ctx, s, spt, types, FTNone, false) if err != nil { return xerrors.Errorf("acquire src storage (remote): %w", err) } ddone() - return r.local.MoveStorage(ctx, s, types) + return r.local.MoveStorage(ctx, s, spt, types) } func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileType) error {