stores: Allow reserving local storage

This commit is contained in:
Łukasz Magiera 2020-07-06 18:36:44 +02:00
parent c5a96fdd08
commit 8099621cd0
6 changed files with 104 additions and 14 deletions

View File

@ -61,14 +61,22 @@ type localWorkerPathProvider struct {
} }
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing stores.PathType) (stores.SectorPaths, func(), error) { func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing stores.PathType) (stores.SectorPaths, func(), error) {
paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing, l.op) paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing, l.op)
if err != nil { if err != nil {
return stores.SectorPaths{}, nil, err return stores.SectorPaths{}, nil, err
} }
releaseStorage, err := l.w.localStore.Reserve(ctx, sector, l.w.scfg.SealProofType, allocate, storageIDs, stores.FSOverheadSeal)
if err != nil {
return stores.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
}
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths) log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
return paths, func() { return paths, func() {
releaseStorage()
for _, fileType := range pathTypes { for _, fileType := range pathTypes {
if fileType&allocate == 0 { if fileType&allocate == 0 {
continue continue

View File

@ -19,15 +19,17 @@ const (
FTNone SectorFileType = 0 FTNone SectorFileType = 0
) )
const FSOverheadDen = 10
var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads
FTUnsealed: 10, FTUnsealed: FSOverheadDen,
FTSealed: 10, FTSealed: FSOverheadDen,
FTCache: 141, // 11 layers + D(2x ssize) + C + R FTCache: 141, // 11 layers + D(2x ssize) + C + R
} }
var FsOverheadFinalized = map[SectorFileType]int{ var FsOverheadFinalized = map[SectorFileType]int{
FTUnsealed: 10, FTUnsealed: FSOverheadDen,
FTSealed: 10, FTSealed: FSOverheadDen,
FTCache: 2, FTCache: 2,
} }
@ -67,7 +69,7 @@ func (t SectorFileType) SealSpaceUse(spt abi.RegisteredSealProof) (uint64, error
return 0, xerrors.Errorf("no seal overhead info for %s", pathType) return 0, xerrors.Errorf("no seal overhead info for %s", pathType)
} }
need += uint64(oh) * uint64(ssize) / 10 need += uint64(oh) * uint64(ssize) / FSOverheadDen
} }
return need, nil return need, nil

View File

@ -79,6 +79,8 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
return return
} }
// TODO: reserve local storage here
path := PathByType(paths, ft) path := PathByType(paths, ft)
if path == "" { if path == "" {
log.Error("acquired path was empty") log.Error("acquired path was empty")

View File

@ -361,7 +361,7 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s
continue continue
} }
if spaceReq > p.fsi.Available { if spaceReq > uint64(p.fsi.Available) {
log.Debugf("not allocating on %s, out of space (available: %d, need: %d)", p.info.ID, p.fsi.Available, spaceReq) log.Debugf("not allocating on %s, out of space (available: %d, need: %d)", p.info.ID, p.fsi.Available, spaceReq)
continue continue
} }

View File

@ -44,13 +44,14 @@ func Stat(path string) (FsStat, error) {
} }
return FsStat{ return FsStat{
Capacity: stat.Blocks * uint64(stat.Bsize), Capacity: int64(stat.Blocks) * stat.Bsize,
Available: stat.Bavail * uint64(stat.Bsize), Available: int64(stat.Bavail) * stat.Bsize,
}, nil }, nil
} }
type FsStat struct { type FsStat struct {
Capacity uint64 Capacity int64
Available uint64 // Available to use for sector storage Available int64 // Available to use for sector storage
Used uint64 Used int64
Reserved int64
} }

View File

@ -67,6 +67,25 @@ type Local struct {
type path struct { type path struct {
local string // absolute local path local string // absolute local path
reserved int64
reservations map[abi.SectorID]SectorFileType
}
type statFn func(path string) (FsStat, error)
func (p *path) stat(st statFn) (FsStat, error) {
stat, err := st(p.local)
if err != nil {
return FsStat{}, err
}
stat.Reserved = p.reserved
stat.Available -= p.reserved
if stat.Available < 0 {
stat.Available = 0
}
return stat, err
} }
func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) { func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) {
@ -98,9 +117,12 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
out := &path{ out := &path{
local: p, local: p,
reserved: 0,
reservations: map[abi.SectorID]SectorFileType{},
} }
fst, err := st.localStorage.Stat(p) fst, err := out.stat(st.localStorage.Stat)
if err != nil { if err != nil {
return err return err
} }
@ -179,7 +201,7 @@ func (st *Local) reportHealth(ctx context.Context) {
toReport := map[ID]HealthReport{} toReport := map[ID]HealthReport{}
for id, p := range st.paths { for id, p := range st.paths {
stat, err := st.localStorage.Stat(p.local) stat, err := p.stat(st.localStorage.Stat)
toReport[id] = HealthReport{ toReport[id] = HealthReport{
Stat: stat, Stat: stat,
@ -197,6 +219,61 @@ func (st *Local) reportHealth(ctx context.Context) {
} }
} }
func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, ft SectorFileType, storageIDs SectorPaths, overheadTab map[SectorFileType]int) (func(), error) {
ssize, err := spt.SectorSize()
if err != nil {
return nil, xerrors.Errorf("getting sector size: %w", err)
}
st.localLk.Lock()
done := func(){}
deferredDone := func() { done() }
defer func() {
st.localLk.Unlock()
deferredDone()
}()
for _, fileType := range PathTypes {
if fileType&ft == 0 {
continue
}
id := ID(PathByType(storageIDs, fileType))
p, ok := st.paths[id]
if !ok {
return nil, errPathNotFound
}
stat, err := p.stat(st.localStorage.Stat)
if err != nil {
return nil, err
}
overhead := int64(overheadTab[fileType]) * int64(ssize) / FSOverheadDen
if stat.Available < overhead {
return nil, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available)
}
p.reserved += overhead
prevDone := done
done = func() {
prevDone()
st.localLk.Lock()
defer st.localLk.Unlock()
p.reserved -= overhead
}
}
deferredDone = func() {}
return done, nil
}
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) { func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) {
if existing|allocate != existing^allocate { if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector") return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector")
@ -463,7 +540,7 @@ func (st *Local) FsStat(ctx context.Context, id ID) (FsStat, error) {
return FsStat{}, errPathNotFound return FsStat{}, errPathNotFound
} }
return st.localStorage.Stat(p.local) return p.stat(st.localStorage.Stat)
} }
var _ Store = &Local{} var _ Store = &Local{}