From 8099621cd0b2a73f96751431ddd861e27498fae1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 6 Jul 2020 18:36:44 +0200 Subject: [PATCH] stores: Allow reserving local storage --- localworker.go | 8 ++++ stores/filetype.go | 12 +++--- stores/http_handler.go | 2 + stores/index.go | 2 +- stores/interface.go | 11 +++--- stores/local.go | 83 ++++++++++++++++++++++++++++++++++++++++-- 6 files changed, 104 insertions(+), 14 deletions(-) diff --git a/localworker.go b/localworker.go index a1d82209a..d03ace359 100644 --- a/localworker.go +++ b/localworker.go @@ -61,14 +61,22 @@ type localWorkerPathProvider struct { } func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing stores.PathType) (stores.SectorPaths, func(), error) { + paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing, l.op) if err != nil { return stores.SectorPaths{}, nil, err } + releaseStorage, err := l.w.localStore.Reserve(ctx, sector, l.w.scfg.SealProofType, allocate, storageIDs, stores.FSOverheadSeal) + if err != nil { + return stores.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err) + } + log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths) return paths, func() { + releaseStorage() + for _, fileType := range pathTypes { if fileType&allocate == 0 { continue diff --git a/stores/filetype.go b/stores/filetype.go index 60c47d1f7..650b92f71 100644 --- a/stores/filetype.go +++ b/stores/filetype.go @@ -19,15 +19,17 @@ const ( FTNone SectorFileType = 0 ) +const FSOverheadDen = 10 + var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads - FTUnsealed: 10, - FTSealed: 10, + FTUnsealed: FSOverheadDen, + FTSealed: FSOverheadDen, FTCache: 141, // 11 layers + D(2x ssize) + C + R } var FsOverheadFinalized = map[SectorFileType]int{ - FTUnsealed: 10, - FTSealed: 10, + FTUnsealed: FSOverheadDen, + FTSealed: FSOverheadDen, FTCache: 2, } @@ -67,7 +69,7 @@ func (t SectorFileType) SealSpaceUse(spt abi.RegisteredSealProof) (uint64, error return 0, xerrors.Errorf("no seal overhead info for %s", pathType) } - need += uint64(oh) * uint64(ssize) / 10 + need += uint64(oh) * uint64(ssize) / FSOverheadDen } return need, nil diff --git a/stores/http_handler.go b/stores/http_handler.go index 93fb94637..4f0556138 100644 --- a/stores/http_handler.go +++ b/stores/http_handler.go @@ -79,6 +79,8 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ return } + // TODO: reserve local storage here + path := PathByType(paths, ft) if path == "" { log.Error("acquired path was empty") diff --git a/stores/index.go b/stores/index.go index 049e2dc20..e48ae02bb 100644 --- a/stores/index.go +++ b/stores/index.go @@ -361,7 +361,7 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s continue } - if spaceReq > p.fsi.Available { + if spaceReq > uint64(p.fsi.Available) { log.Debugf("not allocating on %s, out of space (available: %d, need: %d)", p.info.ID, p.fsi.Available, spaceReq) continue } diff --git a/stores/interface.go b/stores/interface.go index b61980125..6fd4a7ad7 100644 --- a/stores/interface.go +++ b/stores/interface.go @@ -44,13 +44,14 @@ func Stat(path string) (FsStat, error) { } return FsStat{ - Capacity: stat.Blocks * uint64(stat.Bsize), - Available: stat.Bavail * uint64(stat.Bsize), + Capacity: int64(stat.Blocks) * stat.Bsize, + Available: int64(stat.Bavail) * stat.Bsize, }, nil } type FsStat struct { - Capacity uint64 - Available uint64 // Available to use for sector storage - Used uint64 + Capacity int64 + Available int64 // Available to use for sector storage + Used int64 + Reserved int64 } diff --git a/stores/local.go b/stores/local.go index ac63ae0dd..a21909d69 100644 --- a/stores/local.go +++ b/stores/local.go @@ -67,6 +67,25 @@ type Local struct { type path struct { local string // absolute local path + + reserved int64 + reservations map[abi.SectorID]SectorFileType +} + +type statFn func(path string) (FsStat, error) +func (p *path) stat(st statFn) (FsStat, error) { + stat, err := st(p.local) + if err != nil { + return FsStat{}, err + } + + stat.Reserved = p.reserved + stat.Available -= p.reserved + if stat.Available < 0 { + stat.Available = 0 + } + + return stat, err } func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) { @@ -98,9 +117,12 @@ func (st *Local) OpenPath(ctx context.Context, p string) error { out := &path{ local: p, + + reserved: 0, + reservations: map[abi.SectorID]SectorFileType{}, } - fst, err := st.localStorage.Stat(p) + fst, err := out.stat(st.localStorage.Stat) if err != nil { return err } @@ -179,7 +201,7 @@ func (st *Local) reportHealth(ctx context.Context) { toReport := map[ID]HealthReport{} for id, p := range st.paths { - stat, err := st.localStorage.Stat(p.local) + stat, err := p.stat(st.localStorage.Stat) toReport[id] = HealthReport{ Stat: stat, @@ -197,6 +219,61 @@ func (st *Local) reportHealth(ctx context.Context) { } } +func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, ft SectorFileType, storageIDs SectorPaths, overheadTab map[SectorFileType]int) (func(), error) { + ssize, err := spt.SectorSize() + if err != nil { + return nil, xerrors.Errorf("getting sector size: %w", err) + } + + st.localLk.Lock() + + done := func(){} + deferredDone := func() { done() } + defer func() { + st.localLk.Unlock() + deferredDone() + }() + + for _, fileType := range PathTypes { + if fileType&ft == 0 { + continue + } + + id := ID(PathByType(storageIDs, fileType)) + + p, ok := st.paths[id] + if !ok { + return nil, errPathNotFound + } + + stat, err := p.stat(st.localStorage.Stat) + if err != nil { + return nil, err + } + + overhead := int64(overheadTab[fileType]) * int64(ssize) / FSOverheadDen + + if stat.Available < overhead { + return nil, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available) + } + + p.reserved += overhead + + prevDone := done + done = func() { + prevDone() + + st.localLk.Lock() + defer st.localLk.Unlock() + + p.reserved -= overhead + } + } + + deferredDone = func() {} + return done, nil +} + func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) { if existing|allocate != existing^allocate { return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector") @@ -463,7 +540,7 @@ func (st *Local) FsStat(ctx context.Context, id ID) (FsStat, error) { return FsStat{}, errPathNotFound } - return st.localStorage.Stat(p.local) + return p.stat(st.localStorage.Stat) } var _ Store = &Local{}