Merge pull request #29 from filecoin-project/feat/heartbeat

Improve storage selection logic
This commit is contained in:
Łukasz Magiera 2020-05-08 21:24:01 +02:00 committed by GitHub
commit c395e40963
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 171 additions and 51 deletions

View File

@ -58,7 +58,7 @@ type localWorkerPathProvider struct {
}
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing)
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing)
if err != nil {
return stores.SectorPaths{}, nil, err
}
@ -163,7 +163,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e
return xerrors.Errorf("removing unsealed data: %w", err)
}
if err := l.storage.MoveStorage(ctx, sector, stores.FTSealed|stores.FTCache); err != nil {
if err := l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, stores.FTSealed|stores.FTCache); err != nil {
return xerrors.Errorf("moving sealed data to storage: %w", err)
}

View File

@ -4,21 +4,8 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/sector-storage/sealtasks"
"github.com/filecoin-project/sector-storage/stores"
)
var FSOverheadSeal = map[stores.SectorFileType]int{ // 10x overheads
stores.FTUnsealed: 10,
stores.FTSealed: 10,
stores.FTCache: 70, // TODO: confirm for 32G
}
var FsOverheadFinalized = map[stores.SectorFileType]int{
stores.FTUnsealed: 10,
stores.FTSealed: 10,
stores.FTCache: 2,
}
type Resources struct {
MinMemory uint64 // What Must be in RAM for decent perf
MaxMemory uint64 // Memory required (swap + ram)

View File

@ -12,6 +12,7 @@ import (
type readonlyProvider struct {
stor *stores.Local
spt abi.RegisteredProof
}
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
@ -19,7 +20,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e
return stores.SectorPaths{}, nil, xerrors.New("read-only storage")
}
p, _, done, err := l.stor.AcquireSector(ctx, id, existing, allocate, sealing)
p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, sealing)
return p, done, err
}

View File

@ -20,7 +20,7 @@ const mib = 1 << 20
type WorkerAction func(ctx context.Context, w Worker) error
type WorkerSelector interface {
Ok(ctx context.Context, task sealtasks.TaskType, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredProof, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task
Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) // true if a is preferred over b
}
@ -178,7 +178,7 @@ func (sh *scheduler) onWorkerFreed(wid WorkerID) {
for i := 0; i < sh.schedQueue.Len(); i++ {
req := (*sh.schedQueue)[i]
ok, err := req.sel.Ok(req.ctx, req.taskType, w)
ok, err := req.sel.Ok(req.ctx, req.taskType, sh.spt, w)
if err != nil {
log.Errorf("onWorkerFreed req.sel.Ok error: %+v", err)
continue
@ -212,7 +212,7 @@ func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) {
needRes := ResourceTable[req.taskType][sh.spt]
for wid, worker := range sh.workers {
ok, err := req.sel.Ok(req.ctx, req.taskType, worker)
ok, err := req.sel.Ok(req.ctx, req.taskType, sh.spt, worker)
if err != nil {
return false, err
}

View File

@ -5,26 +5,25 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/sector-storage/sealtasks"
"github.com/filecoin-project/sector-storage/stores"
)
type allocSelector struct {
best []stores.StorageInfo
index stores.SectorIndex
alloc stores.SectorFileType
}
func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType) (*allocSelector, error) {
best, err := index.StorageBestAlloc(ctx, alloc, true)
if err != nil {
return nil, err
}
return &allocSelector{
best: best,
index: index,
alloc: alloc,
}, nil
}
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) {
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredProof, whnd *workerHandle) (bool, error) {
tasks, err := whnd.w.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)
@ -43,7 +42,12 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *w
have[path.ID] = struct{}{}
}
for _, info := range s.best {
best, err := s.index.StorageBestAlloc(ctx, s.alloc, spt, true)
if err != nil {
return false, xerrors.Errorf("finding best alloc storage: %w", err)
}
for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, nil
}

View File

@ -5,9 +5,10 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/sector-storage/sealtasks"
"github.com/filecoin-project/sector-storage/stores"
"github.com/filecoin-project/specs-actors/actors/abi"
)
type existingSelector struct {
@ -25,7 +26,7 @@ func newExistingSelector(ctx context.Context, index stores.SectorIndex, sector a
}, nil
}
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) {
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredProof, whnd *workerHandle) (bool, error) {
tasks, err := whnd.w.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)

View File

@ -5,6 +5,8 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/sector-storage/sealtasks"
"github.com/filecoin-project/sector-storage/stores"
)
@ -17,7 +19,7 @@ func newTaskSelector() *taskSelector {
return &taskSelector{}
}
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) {
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredProof, whnd *workerHandle) (bool, error) {
tasks, err := whnd.w.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)

View File

@ -17,6 +17,18 @@ const (
FTNone SectorFileType = 0
)
var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads
FTUnsealed: 10,
FTSealed: 10,
FTCache: 70, // TODO: confirm for 32G
}
var FsOverheadFinalized = map[SectorFileType]int{
FTUnsealed: 10,
FTSealed: 10,
FTCache: 2,
}
type SectorFileType int
func (t SectorFileType) String() string {
@ -36,6 +48,29 @@ func (t SectorFileType) Has(singleType SectorFileType) bool {
return t&singleType == singleType
}
func (t SectorFileType) SealSpaceUse(spt abi.RegisteredProof) (uint64, error) {
ssize, err := spt.SectorSize()
if err != nil {
return 0, xerrors.Errorf("getting sector size: %w", err)
}
var need uint64
for _, pathType := range PathTypes {
if !t.Has(pathType) {
continue
}
oh, ok := FSOverheadSeal[pathType]
if !ok {
return 0, xerrors.Errorf("no seal overhead info for %s", pathType)
}
need += uint64(oh) * uint64(ssize) / 10
}
return need, nil
}
type SectorPaths struct {
Id abi.SectorID

View File

@ -68,7 +68,9 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
w.WriteHeader(500)
return
}
paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, ft, FTNone, false)
// passing 0 spt because we don't allocate anything
paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false)
if err != nil {
log.Error("%+v", err)
w.WriteHeader(500)

View File

@ -6,6 +6,7 @@ import (
gopath "path"
"sort"
"sync"
"time"
"golang.org/x/xerrors"
@ -13,6 +14,9 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi/big"
)
var HeartbeatInterval = 10 * time.Second
var SkippedHeartbeatThresh = HeartbeatInterval * 5
// ID identifies sector storage by UUID. One sector storage should map to one
// filesystem, local or networked / shared by multiple machines
type ID string
@ -24,18 +28,26 @@ type StorageInfo struct {
CanSeal bool
CanStore bool
LastHeartbeat time.Time
HeartbeatErr error
}
type HealthReport struct {
Stat FsStat
Err error
}
type SectorIndex interface { // part of storage-miner api
StorageAttach(context.Context, StorageInfo, FsStat) error
StorageInfo(context.Context, ID) (StorageInfo, error)
// TODO: StorageUpdateStats(FsStat)
StorageReportHealth(context.Context, ID, HealthReport) error
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error)
StorageBestAlloc(ctx context.Context, allocate SectorFileType, sealing bool) ([]StorageInfo, error)
StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, sealing bool) ([]StorageInfo, error)
}
type Decl struct {
@ -46,6 +58,9 @@ type Decl struct {
type storageEntry struct {
info *StorageInfo
fsi FsStat
lastHeartbeat time.Time
heartbeatErr error
}
type Index struct {
@ -120,10 +135,28 @@ func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st FsStat) er
i.stores[si.ID] = &storageEntry{
info: &si,
fsi: st,
lastHeartbeat: time.Now(),
}
return nil
}
func (i *Index) StorageReportHealth(ctx context.Context, id ID, report HealthReport) error {
i.lk.Lock()
defer i.lk.Unlock()
ent, ok := i.stores[id]
if !ok {
return xerrors.Errorf("health report for unknown storage: %s", id)
}
ent.fsi = report.Stat
ent.heartbeatErr = report.Err
ent.lastHeartbeat = time.Now()
return nil
}
func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error {
i.lk.Lock()
defer i.lk.Unlock()
@ -269,12 +302,17 @@ func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) {
return *si.info, nil
}
func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, sealing bool) ([]StorageInfo, error) {
func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, sealing bool) ([]StorageInfo, error) {
i.lk.RLock()
defer i.lk.RUnlock()
var candidates []storageEntry
spaceReq, err := allocate.SealSpaceUse(spt)
if err != nil {
return nil, xerrors.Errorf("estimating required space: %w", err)
}
for _, p := range i.stores {
if sealing && !p.info.CanSeal {
continue
@ -283,7 +321,20 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s
continue
}
// TODO: filter out of space
if spaceReq > p.fsi.Available {
log.Debugf("not allocating on %s, out of space (available: %d, need: %d)", p.info.ID, p.fsi.Available, spaceReq)
continue
}
if time.Since(p.lastHeartbeat) > SkippedHeartbeatThresh {
log.Debugf("not allocating on %s, didn't receive heartbeats for %s", p.info.ID, time.Since(p.lastHeartbeat))
continue
}
if p.heartbeatErr != nil {
log.Debugf("not allocating on %s, heartbeat error: %s", p.info.ID, p.heartbeatErr)
continue
}
candidates = append(candidates, *p)
}

View File

@ -10,11 +10,11 @@ import (
)
type Store interface {
AcquireSector(ctx context.Context, s abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (paths SectorPaths, stores SectorPaths, done func(), err error)
AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (paths SectorPaths, stores SectorPaths, done func(), err error)
Remove(ctx context.Context, s abi.SectorID, types SectorFileType) error
// move sectors into storage
MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error
MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error
FsStat(ctx context.Context, id ID) (FsStat, error)
}

View File

@ -5,9 +5,11 @@ import (
"encoding/json"
"io/ioutil"
"math/bits"
"math/rand"
"os"
"path/filepath"
"sync"
"time"
"golang.org/x/xerrors"
@ -155,10 +157,45 @@ func (st *Local) open(ctx context.Context) error {
}
}
go st.reportHealth(ctx)
return nil
}
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
func (st *Local) reportHealth(ctx context.Context) {
// randomize interval by ~10%
interval := (HeartbeatInterval*100_000 + time.Duration(rand.Int63n(10_000))) / 100_000
for {
select {
case <-time.After(interval):
case <-ctx.Done():
return
}
st.localLk.RLock()
toReport := map[ID]HealthReport{}
for id, p := range st.paths {
stat, err := Stat(p.local)
toReport[id] = HealthReport{
Stat: stat,
Err: err,
}
}
st.localLk.RUnlock()
for id, report := range toReport {
if err := st.index.StorageReportHealth(ctx, id, report); err != nil {
log.Warnf("error reporting storage health for %s: %+v", id, report)
}
}
}
}
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
}
@ -203,7 +240,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing S
continue
}
sis, err := st.index.StorageBestAlloc(ctx, fileType, sealing)
sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, sealing)
if err != nil {
st.localLk.RUnlock()
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err)
@ -315,14 +352,14 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileTyp
return nil
}
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error {
dest, destIds, sdone, err := st.AcquireSector(ctx, s, FTNone, types, false)
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error {
dest, destIds, sdone, err := st.AcquireSector(ctx, s, spt, FTNone, types, false)
if err != nil {
return xerrors.Errorf("acquire dest storage: %w", err)
}
defer sdone()
src, srcIds, ddone, err := st.AcquireSector(ctx, s, types, FTNone, false)
src, srcIds, ddone, err := st.AcquireSector(ctx, s, spt, types, FTNone, false)
if err != nil {
return xerrors.Errorf("acquire src storage: %w", err)
}

View File

@ -41,7 +41,7 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
}
}
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
}
@ -73,7 +73,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing Sec
r.fetchLk.Unlock()
}()
paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing)
paths, stores, done, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, sealing)
if err != nil {
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err)
}
@ -87,7 +87,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing Sec
continue
}
ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing)
ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, spt, fileType, sealing)
if err != nil {
done()
return SectorPaths{}, SectorPaths{}, nil, err
@ -111,7 +111,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing Sec
return paths, stores, done, nil
}
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType SectorFileType, sealing bool) (string, ID, string, func(), error) {
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, sealing bool) (string, ID, string, func(), error) {
si, err := r.index.StorageFindSector(ctx, s, fileType, false)
if err != nil {
return "", "", "", nil, err
@ -125,7 +125,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
return si[i].Weight < si[j].Weight
})
apaths, ids, done, err := r.local.AcquireSector(ctx, s, FTNone, fileType, sealing)
apaths, ids, done, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, sealing)
if err != nil {
return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
}
@ -203,15 +203,15 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
}
}
func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error {
func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error {
// Make sure we have the data local
_, _, ddone, err := r.AcquireSector(ctx, s, types, FTNone, false)
_, _, ddone, err := r.AcquireSector(ctx, s, spt, types, FTNone, false)
if err != nil {
return xerrors.Errorf("acquire src storage (remote): %w", err)
}
ddone()
return r.local.MoveStorage(ctx, s, types)
return r.local.MoveStorage(ctx, s, spt, types)
}
func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileType) error {