diff --git a/curiosrc/ffi/sdr_funcs.go b/curiosrc/ffi/sdr_funcs.go index 5987c7856..e9ce62831 100644 --- a/curiosrc/ffi/sdr_funcs.go +++ b/curiosrc/ffi/sdr_funcs.go @@ -148,6 +148,14 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID, return xerrors.Errorf("computing replica id: %w", err) } + // make sure the cache dir is empty + if err := os.RemoveAll(paths.Cache); err != nil { + return xerrors.Errorf("removing cache dir: %w", err) + } + if err := os.MkdirAll(paths.Cache, 0755); err != nil { + return xerrors.Errorf("mkdir cache dir: %w", err) + } + // generate new sector key err = ffi.GenerateSDR( sector.ProofType, diff --git a/curiosrc/ffi/task_storage.go b/curiosrc/ffi/task_storage.go index 4d30eaff9..f01a472fa 100644 --- a/curiosrc/ffi/task_storage.go +++ b/curiosrc/ffi/task_storage.go @@ -2,6 +2,7 @@ package ffi import ( "context" + "sync" "time" "golang.org/x/xerrors" @@ -170,9 +171,14 @@ func (t *TaskStorage) Claim(taskID int) error { return err } + var releaseOnce sync.Once + releaseFunc := func() { + releaseOnce.Do(release) + } + sres := &StorageReservation{ SectorRef: sectorRef, - Release: release, + Release: releaseFunc, Paths: pathsFs, PathIDs: pathIDs, diff --git a/storage/paths/local.go b/storage/paths/local.go index 74fde977d..006854bbf 100644 --- a/storage/paths/local.go +++ b/storage/paths/local.go @@ -7,6 +7,7 @@ import ( "math/rand" "os" "path/filepath" + "runtime" "sync" "time" @@ -45,20 +46,26 @@ type Local struct { localLk sync.RWMutex } +type sectorFile struct { + sid abi.SectorID + ft storiface.SectorFileType +} + type path struct { local string // absolute local path maxStorage uint64 reserved int64 - reservations map[abi.SectorID]storiface.SectorFileType + reservations map[sectorFile]int64 } // statExistingSectorForReservation is optional parameter for stat method // which will make it take into account existing sectors when calculating // available space for new reservations type statExistingSectorForReservation struct { - id abi.SectorID - ft storiface.SectorFileType + id abi.SectorID + ft storiface.SectorFileType + overhead int64 } func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservation) (stat fsutil.FsStat, newResvOnDisk int64, err error) { @@ -72,7 +79,7 @@ func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservat stat.Reserved = p.reserved var newReserveOnDisk int64 - accountExistingFiles := func(id abi.SectorID, fileType storiface.SectorFileType) (int64, error) { + accountExistingFiles := func(id abi.SectorID, fileType storiface.SectorFileType, overhead int64) (int64, error) { sp := p.sectorPath(id, fileType) used, err := ls.DiskUsage(sp) @@ -94,35 +101,58 @@ func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservat return 0, nil } + log.Debugw("accounting existing files", "id", id, "fileType", fileType, "path", sp, "used", used, "overhead", overhead) return used, nil } - for id, ft := range p.reservations { - for _, fileType := range ft.AllSet() { - onDisk, err := accountExistingFiles(id, fileType) - if err != nil { - return fsutil.FsStat{}, 0, err - } - stat.Reserved -= onDisk + for id, oh := range p.reservations { + onDisk, err := accountExistingFiles(id.sid, id.ft, oh) + if err != nil { + return fsutil.FsStat{}, 0, err } + if onDisk > oh { + log.Warnw("reserved space on disk is greater than expected", "id", id.sid, "fileType", id.ft, "onDisk", onDisk, "oh", oh) + onDisk = oh + } + + stat.Reserved -= onDisk } for _, reservation := range newReserve { for _, fileType := range reservation.ft.AllSet() { - if p.reservations[reservation.id]&fileType != 0 { + log.Debugw("accounting existing files for new reservation", "id", reservation.id, "fileType", fileType, "overhead", reservation.overhead) + + resID := sectorFile{reservation.id, fileType} + + if _, has := p.reservations[resID]; has { // already accounted for continue } - onDisk, err := accountExistingFiles(reservation.id, fileType) + onDisk, err := accountExistingFiles(reservation.id, fileType, reservation.overhead) if err != nil { return fsutil.FsStat{}, 0, err } + if onDisk > reservation.overhead { + log.Warnw("reserved space on disk is greater than expected (new resv)", "id", reservation.id, "fileType", fileType, "onDisk", onDisk, "oh", reservation.overhead) + onDisk = reservation.overhead + } + newReserveOnDisk += onDisk } } if stat.Reserved < 0 { - log.Warnf("negative reserved storage: p.reserved=%d, reserved: %d", p.reserved, stat.Reserved) + //log.Warnf("negative reserved storage: p.reserved=%d, reserved: %d", p.reserved, stat.Reserved) + var jsonReservations []map[string]interface{} + for id, res := range p.reservations { + jsonReservations = append(jsonReservations, map[string]interface{}{ + "id": id.sid, + "ft": id.ft, + "res": res, + }) + } + + log.Warnw("negative reserved storage", "reserved", stat.Reserved, "origResv", p.reserved, "reservations", len(p.reservations), "newReserveOnDisk", newReserveOnDisk, "reservations", jsonReservations) stat.Reserved = 0 } @@ -199,7 +229,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error { maxStorage: meta.MaxStorage, reserved: 0, - reservations: map[abi.SectorID]storiface.SectorFileType{}, + reservations: map[sectorFile]int64{}, } fst, _, err := out.stat(st.localStorage) @@ -430,7 +460,7 @@ func (st *Local) reportStorage(ctx context.Context) { } } -func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) { +func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (release func(), err error) { ssize, err := sid.ProofType.SectorSize() if err != nil { return nil, err @@ -438,11 +468,35 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif st.localLk.Lock() - done := func() {} - deferredDone := func() { done() } + var releaseCalled bool + + // double release debug guard + var firstDonebuf []byte + var releaseFuncs []func() + + release = func() { + for _, releaseFunc := range releaseFuncs { + releaseFunc() + } + + // debug guard against double release call + if releaseCalled { + curStack := make([]byte, 20480) + curStack = curStack[:runtime.Stack(curStack, false)] + + log.Errorw("double release call", "sector", sid, "fileType", ft, "prevStack", string(firstDonebuf), "curStack", string(curStack)) + } + + firstDonebuf = make([]byte, 20480) + firstDonebuf = firstDonebuf[:runtime.Stack(firstDonebuf, false)] + + releaseCalled = true + } + + cleanupOnError := func() { release() } defer func() { st.localLk.Unlock() - deferredDone() + cleanupOnError() }() for _, fileType := range ft.AllSet() { @@ -453,13 +507,13 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif return nil, errPathNotFound } - stat, resvOnDisk, err := p.stat(st.localStorage, statExistingSectorForReservation{sid.ID, fileType}) + overhead := int64(overheadTab[fileType]) * int64(ssize) / storiface.FSOverheadDen + + stat, resvOnDisk, err := p.stat(st.localStorage, statExistingSectorForReservation{sid.ID, fileType, overhead}) if err != nil { return nil, xerrors.Errorf("getting local storage stat: %w", err) } - overhead := int64(overheadTab[fileType]) * int64(ssize) / storiface.FSOverheadDen - if overhead-resvOnDisk < 0 { log.Errorw("negative overhead vs on-disk data", "overhead", overhead, "on-disk", resvOnDisk, "id", id, "sector", sid, "fileType", fileType) resvOnDisk = overhead @@ -469,27 +523,28 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available)) } + resID := sectorFile{sid.ID, fileType} + + log.Debugw("reserve add", "id", id, "sector", sid, "fileType", fileType, "overhead", overhead, "reserved-before", p.reserved, "reserved-after", p.reserved+overhead) + p.reserved += overhead - p.reservations[sid.ID] |= fileType - - prevDone := done - saveFileType := fileType - done = func() { - prevDone() + p.reservations[resID] = overhead + releaseFuncs = append(releaseFuncs, func() { st.localLk.Lock() defer st.localLk.Unlock() + log.Debugw("reserve release", "id", id, "sector", sid, "fileType", fileType, "overhead", overhead, "reserved-before", p.reserved, "reserved-after", p.reserved-overhead) + p.reserved -= overhead - p.reservations[sid.ID] ^= saveFileType - if p.reservations[sid.ID] == storiface.FTNone { - delete(p.reservations, sid.ID) - } - } + delete(p.reservations, resID) + }) } - deferredDone = func() {} - return done, nil + // no errors, don't cleanup, caller will call release + cleanupOnError = func() {} + + return release, nil } func (st *Local) AcquireSector(ctx context.Context, sid storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode, opts ...storiface.AcquireOption) (storiface.SectorPaths, storiface.SectorPaths, error) {