fix: local storage reservations fixes (#11866)
* paths: Debugging local storage reservations * paths: Log when individual reservation is less than on-disk space * paths: fix debug reservations print * paths: More reserve logs * paths: More more reserve logs * paths: add stacks to duplicate done call log * curio: task storage: Release storage at most once * curio: cleanup before restarting sdr * address review * paths: Simplify reservation release logic
This commit is contained in:
parent
50ed73de29
commit
bc43bd6d69
@ -148,6 +148,14 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID,
|
||||
return xerrors.Errorf("computing replica id: %w", err)
|
||||
}
|
||||
|
||||
// make sure the cache dir is empty
|
||||
if err := os.RemoveAll(paths.Cache); err != nil {
|
||||
return xerrors.Errorf("removing cache dir: %w", err)
|
||||
}
|
||||
if err := os.MkdirAll(paths.Cache, 0755); err != nil {
|
||||
return xerrors.Errorf("mkdir cache dir: %w", err)
|
||||
}
|
||||
|
||||
// generate new sector key
|
||||
err = ffi.GenerateSDR(
|
||||
sector.ProofType,
|
||||
|
@ -2,6 +2,7 @@ package ffi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
@ -170,9 +171,14 @@ func (t *TaskStorage) Claim(taskID int) error {
|
||||
return err
|
||||
}
|
||||
|
||||
var releaseOnce sync.Once
|
||||
releaseFunc := func() {
|
||||
releaseOnce.Do(release)
|
||||
}
|
||||
|
||||
sres := &StorageReservation{
|
||||
SectorRef: sectorRef,
|
||||
Release: release,
|
||||
Release: releaseFunc,
|
||||
Paths: pathsFs,
|
||||
PathIDs: pathIDs,
|
||||
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -45,12 +46,17 @@ type Local struct {
|
||||
localLk sync.RWMutex
|
||||
}
|
||||
|
||||
type sectorFile struct {
|
||||
sid abi.SectorID
|
||||
ft storiface.SectorFileType
|
||||
}
|
||||
|
||||
type path struct {
|
||||
local string // absolute local path
|
||||
maxStorage uint64
|
||||
|
||||
reserved int64
|
||||
reservations map[abi.SectorID]storiface.SectorFileType
|
||||
reservations map[sectorFile]int64
|
||||
}
|
||||
|
||||
// statExistingSectorForReservation is optional parameter for stat method
|
||||
@ -59,6 +65,7 @@ type path struct {
|
||||
type statExistingSectorForReservation struct {
|
||||
id abi.SectorID
|
||||
ft storiface.SectorFileType
|
||||
overhead int64
|
||||
}
|
||||
|
||||
func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservation) (stat fsutil.FsStat, newResvOnDisk int64, err error) {
|
||||
@ -72,7 +79,7 @@ func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservat
|
||||
stat.Reserved = p.reserved
|
||||
var newReserveOnDisk int64
|
||||
|
||||
accountExistingFiles := func(id abi.SectorID, fileType storiface.SectorFileType) (int64, error) {
|
||||
accountExistingFiles := func(id abi.SectorID, fileType storiface.SectorFileType, overhead int64) (int64, error) {
|
||||
sp := p.sectorPath(id, fileType)
|
||||
|
||||
used, err := ls.DiskUsage(sp)
|
||||
@ -94,35 +101,58 @@ func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservat
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
log.Debugw("accounting existing files", "id", id, "fileType", fileType, "path", sp, "used", used, "overhead", overhead)
|
||||
return used, nil
|
||||
}
|
||||
|
||||
for id, ft := range p.reservations {
|
||||
for _, fileType := range ft.AllSet() {
|
||||
onDisk, err := accountExistingFiles(id, fileType)
|
||||
for id, oh := range p.reservations {
|
||||
onDisk, err := accountExistingFiles(id.sid, id.ft, oh)
|
||||
if err != nil {
|
||||
return fsutil.FsStat{}, 0, err
|
||||
}
|
||||
stat.Reserved -= onDisk
|
||||
if onDisk > oh {
|
||||
log.Warnw("reserved space on disk is greater than expected", "id", id.sid, "fileType", id.ft, "onDisk", onDisk, "oh", oh)
|
||||
onDisk = oh
|
||||
}
|
||||
|
||||
stat.Reserved -= onDisk
|
||||
}
|
||||
for _, reservation := range newReserve {
|
||||
for _, fileType := range reservation.ft.AllSet() {
|
||||
if p.reservations[reservation.id]&fileType != 0 {
|
||||
log.Debugw("accounting existing files for new reservation", "id", reservation.id, "fileType", fileType, "overhead", reservation.overhead)
|
||||
|
||||
resID := sectorFile{reservation.id, fileType}
|
||||
|
||||
if _, has := p.reservations[resID]; has {
|
||||
// already accounted for
|
||||
continue
|
||||
}
|
||||
|
||||
onDisk, err := accountExistingFiles(reservation.id, fileType)
|
||||
onDisk, err := accountExistingFiles(reservation.id, fileType, reservation.overhead)
|
||||
if err != nil {
|
||||
return fsutil.FsStat{}, 0, err
|
||||
}
|
||||
if onDisk > reservation.overhead {
|
||||
log.Warnw("reserved space on disk is greater than expected (new resv)", "id", reservation.id, "fileType", fileType, "onDisk", onDisk, "oh", reservation.overhead)
|
||||
onDisk = reservation.overhead
|
||||
}
|
||||
|
||||
newReserveOnDisk += onDisk
|
||||
}
|
||||
}
|
||||
|
||||
if stat.Reserved < 0 {
|
||||
log.Warnf("negative reserved storage: p.reserved=%d, reserved: %d", p.reserved, stat.Reserved)
|
||||
//log.Warnf("negative reserved storage: p.reserved=%d, reserved: %d", p.reserved, stat.Reserved)
|
||||
var jsonReservations []map[string]interface{}
|
||||
for id, res := range p.reservations {
|
||||
jsonReservations = append(jsonReservations, map[string]interface{}{
|
||||
"id": id.sid,
|
||||
"ft": id.ft,
|
||||
"res": res,
|
||||
})
|
||||
}
|
||||
|
||||
log.Warnw("negative reserved storage", "reserved", stat.Reserved, "origResv", p.reserved, "reservations", len(p.reservations), "newReserveOnDisk", newReserveOnDisk, "reservations", jsonReservations)
|
||||
stat.Reserved = 0
|
||||
}
|
||||
|
||||
@ -199,7 +229,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
|
||||
|
||||
maxStorage: meta.MaxStorage,
|
||||
reserved: 0,
|
||||
reservations: map[abi.SectorID]storiface.SectorFileType{},
|
||||
reservations: map[sectorFile]int64{},
|
||||
}
|
||||
|
||||
fst, _, err := out.stat(st.localStorage)
|
||||
@ -430,7 +460,7 @@ func (st *Local) reportStorage(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) {
|
||||
func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (release func(), err error) {
|
||||
ssize, err := sid.ProofType.SectorSize()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -438,11 +468,35 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif
|
||||
|
||||
st.localLk.Lock()
|
||||
|
||||
done := func() {}
|
||||
deferredDone := func() { done() }
|
||||
var releaseCalled bool
|
||||
|
||||
// double release debug guard
|
||||
var firstDonebuf []byte
|
||||
var releaseFuncs []func()
|
||||
|
||||
release = func() {
|
||||
for _, releaseFunc := range releaseFuncs {
|
||||
releaseFunc()
|
||||
}
|
||||
|
||||
// debug guard against double release call
|
||||
if releaseCalled {
|
||||
curStack := make([]byte, 20480)
|
||||
curStack = curStack[:runtime.Stack(curStack, false)]
|
||||
|
||||
log.Errorw("double release call", "sector", sid, "fileType", ft, "prevStack", string(firstDonebuf), "curStack", string(curStack))
|
||||
}
|
||||
|
||||
firstDonebuf = make([]byte, 20480)
|
||||
firstDonebuf = firstDonebuf[:runtime.Stack(firstDonebuf, false)]
|
||||
|
||||
releaseCalled = true
|
||||
}
|
||||
|
||||
cleanupOnError := func() { release() }
|
||||
defer func() {
|
||||
st.localLk.Unlock()
|
||||
deferredDone()
|
||||
cleanupOnError()
|
||||
}()
|
||||
|
||||
for _, fileType := range ft.AllSet() {
|
||||
@ -453,13 +507,13 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif
|
||||
return nil, errPathNotFound
|
||||
}
|
||||
|
||||
stat, resvOnDisk, err := p.stat(st.localStorage, statExistingSectorForReservation{sid.ID, fileType})
|
||||
overhead := int64(overheadTab[fileType]) * int64(ssize) / storiface.FSOverheadDen
|
||||
|
||||
stat, resvOnDisk, err := p.stat(st.localStorage, statExistingSectorForReservation{sid.ID, fileType, overhead})
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting local storage stat: %w", err)
|
||||
}
|
||||
|
||||
overhead := int64(overheadTab[fileType]) * int64(ssize) / storiface.FSOverheadDen
|
||||
|
||||
if overhead-resvOnDisk < 0 {
|
||||
log.Errorw("negative overhead vs on-disk data", "overhead", overhead, "on-disk", resvOnDisk, "id", id, "sector", sid, "fileType", fileType)
|
||||
resvOnDisk = overhead
|
||||
@ -469,27 +523,28 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif
|
||||
return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available))
|
||||
}
|
||||
|
||||
resID := sectorFile{sid.ID, fileType}
|
||||
|
||||
log.Debugw("reserve add", "id", id, "sector", sid, "fileType", fileType, "overhead", overhead, "reserved-before", p.reserved, "reserved-after", p.reserved+overhead)
|
||||
|
||||
p.reserved += overhead
|
||||
p.reservations[sid.ID] |= fileType
|
||||
|
||||
prevDone := done
|
||||
saveFileType := fileType
|
||||
done = func() {
|
||||
prevDone()
|
||||
p.reservations[resID] = overhead
|
||||
|
||||
releaseFuncs = append(releaseFuncs, func() {
|
||||
st.localLk.Lock()
|
||||
defer st.localLk.Unlock()
|
||||
|
||||
log.Debugw("reserve release", "id", id, "sector", sid, "fileType", fileType, "overhead", overhead, "reserved-before", p.reserved, "reserved-after", p.reserved-overhead)
|
||||
|
||||
p.reserved -= overhead
|
||||
p.reservations[sid.ID] ^= saveFileType
|
||||
if p.reservations[sid.ID] == storiface.FTNone {
|
||||
delete(p.reservations, sid.ID)
|
||||
}
|
||||
}
|
||||
delete(p.reservations, resID)
|
||||
})
|
||||
}
|
||||
|
||||
deferredDone = func() {}
|
||||
return done, nil
|
||||
// no errors, don't cleanup, caller will call release
|
||||
cleanupOnError = func() {}
|
||||
|
||||
return release, nil
|
||||
}
|
||||
|
||||
func (st *Local) AcquireSector(ctx context.Context, sid storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode, opts ...storiface.AcquireOption) (storiface.SectorPaths, storiface.SectorPaths, error) {
|
||||
|
Loading…
Reference in New Issue
Block a user