From b58eba0d999d33c6596ea84ed390aa5fc34f814d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 4 Aug 2020 16:20:59 +0200 Subject: [PATCH 1/2] remote: Fetch storage reservation --- fsutil/filesize_unix.go | 6 +++- stores/interface.go | 8 ++--- stores/local.go | 21 +++++++++--- stores/remote.go | 71 ++++++++++++++++++++++++++++------------- 4 files changed, 74 insertions(+), 32 deletions(-) diff --git a/fsutil/filesize_unix.go b/fsutil/filesize_unix.go index 41b62daf6..dacdcd515 100644 --- a/fsutil/filesize_unix.go +++ b/fsutil/filesize_unix.go @@ -1,6 +1,7 @@ package fsutil import ( + "os" "syscall" "golang.org/x/xerrors" @@ -14,12 +15,15 @@ type SizeInfo struct { func FileSize(path string) (SizeInfo, error) { var stat syscall.Stat_t if err := syscall.Stat(path, &stat); err != nil { + if err == syscall.ENOENT { + return SizeInfo{}, os.ErrNotExist + } return SizeInfo{}, xerrors.Errorf("stat: %w", err) } // NOTE: stat.Blocks is in 512B blocks, NOT in stat.Blksize // See https://www.gnu.org/software/libc/manual/html_node/Attribute-Meanings.html return SizeInfo{ - int64(stat.Blocks) * 512, + int64(stat.Blocks) * 512, // NOTE: int64 cast is needed on osx }, nil } diff --git a/stores/interface.go b/stores/interface.go index 836705f40..142769b1b 100644 --- a/stores/interface.go +++ b/stores/interface.go @@ -9,15 +9,15 @@ import ( type PathType string const ( - PathStorage = "storage" - PathSealing = "sealing" + PathStorage PathType = "storage" + PathSealing PathType = "sealing" ) type AcquireMode string const ( - AcquireMove = "move" - AcquireCopy = "copy" + AcquireMove AcquireMode = "move" + AcquireCopy AcquireMode = "copy" ) type Store interface { diff --git a/stores/local.go b/stores/local.go index 9a7ec6108..9efab6480 100644 --- a/stores/local.go +++ b/stores/local.go @@ -50,7 +50,10 @@ type LocalStorage interface { SetStorage(func(*StorageConfig)) error Stat(path string) (fsutil.FsStat, error) - DiskUsage(path string) (int64, error) // returns real disk usage for a file/directory + + // returns real disk usage for a file/directory + // os.ErrNotExit when file doesn't exist + DiskUsage(path string) (int64, error) } const MetaFile = "sectorstore.json" @@ -77,7 +80,7 @@ type path struct { func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) { stat, err := ls.Stat(p.local) if err != nil { - return fsutil.FsStat{}, err + return fsutil.FsStat{}, xerrors.Errorf("stat %s: %w", p.local, err) } stat.Reserved = p.reserved @@ -88,7 +91,17 @@ func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) { continue } - used, err := ls.DiskUsage(p.sectorPath(id, fileType)) + sp := p.sectorPath(id, fileType) + + used, err := ls.DiskUsage(sp) + if err == os.ErrNotExist { + p, ferr := tempFetchDest(sp, false) + if ferr != nil { + return fsutil.FsStat{}, ferr + } + + used, err = ls.DiskUsage(p) + } if err != nil { log.Errorf("getting disk usage of '%s': %+v", p.sectorPath(id, fileType), err) continue @@ -279,7 +292,7 @@ func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.Register stat, err := p.stat(st.localStorage) if err != nil { - return nil, err + return nil, xerrors.Errorf("getting local storage stat: %w", err) } overhead := int64(overheadTab[fileType]) * int64(ssize) / FSOverheadDen diff --git a/stores/remote.go b/stores/remote.go index 42b730b40..93dc2ca58 100644 --- a/stores/remote.go +++ b/stores/remote.go @@ -95,6 +95,33 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi return SectorPaths{}, SectorPaths{}, xerrors.Errorf("local acquire error: %w", err) } + var toFetch SectorFileType + for _, fileType := range PathTypes { + if fileType&existing == 0 { + continue + } + + if PathByType(paths, fileType) == "" { + toFetch |= fileType + } + } + + apaths, ids, err := r.local.AcquireSector(ctx, s, spt, FTNone, toFetch, pathType, op) + if err != nil { + return SectorPaths{}, SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err) + } + + odt := FSOverheadSeal + if pathType == PathStorage { + odt = FsOverheadFinalized + } + + releaseStorage, err := r.local.Reserve(ctx, s, spt, toFetch, ids, odt) + if err != nil { + return SectorPaths{}, SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err) + } + defer releaseStorage() + for _, fileType := range PathTypes { if fileType&existing == 0 { continue @@ -104,15 +131,18 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi continue } - ap, storageID, url, err := r.acquireFromRemote(ctx, s, spt, fileType, pathType, op) + dest := PathByType(apaths, fileType) + storageID := PathByType(ids, fileType) + + url, err := r.acquireFromRemote(ctx, s, fileType, dest) if err != nil { return SectorPaths{}, SectorPaths{}, err } - SetPathByType(&paths, fileType, ap) - SetPathByType(&stores, fileType, string(storageID)) + SetPathByType(&paths, fileType, dest) + SetPathByType(&stores, fileType, storageID) - if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType, op == AcquireMove); err != nil { + if err := r.index.StorageDeclareSector(ctx, ID(storageID), s, fileType, op == AcquireMove); err != nil { log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err) continue } @@ -127,49 +157,44 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi return paths, stores, nil } -func tempDest(spath string) (string, error) { +func tempFetchDest(spath string, create bool) (string, error) { st, b := filepath.Split(spath) tempdir := filepath.Join(st, FetchTempSubdir) - if err := os.MkdirAll(tempdir, 755); err != nil { - return "", xerrors.Errorf("creating temp fetch dir: %w", err) + if create { + if err := os.MkdirAll(tempdir, 0755); err != nil { + return "", xerrors.Errorf("creating temp fetch dir: %w", err) + } } return filepath.Join(tempdir, b), nil } -func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, fileType SectorFileType, pathType PathType, op AcquireMode) (string, ID, string, error) { +func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType SectorFileType, dest string) (string, error) { si, err := r.index.StorageFindSector(ctx, s, fileType, false) if err != nil { - return "", "", "", err + return "", err } if len(si) == 0 { - return "", "", "", xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound) + return "", xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound) } sort.Slice(si, func(i, j int) bool { return si[i].Weight < si[j].Weight }) - apaths, ids, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, pathType, op) - if err != nil { - return "", "", "", xerrors.Errorf("allocate local sector for fetching: %w", err) - } - dest := PathByType(apaths, fileType) - storageID := PathByType(ids, fileType) - var merr error for _, info := range si { // TODO: see what we have local, prefer that for _, url := range info.URLs { - tempDest, err := tempDest(dest) + tempDest, err := tempFetchDest(dest, true) if err != nil { - return "", "", "", err + return "", err } if err := os.RemoveAll(dest); err != nil { - return "", "", "", xerrors.Errorf("removing dest: %w", err) + return "", xerrors.Errorf("removing dest: %w", err) } err = r.fetch(ctx, url, tempDest) @@ -179,17 +204,17 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi. } if err := move(tempDest, dest); err != nil { - return "", "", "", xerrors.Errorf("fetch move error (storage %s) %s -> %s: %w", info.ID, tempDest, dest, err) + return "", xerrors.Errorf("fetch move error (storage %s) %s -> %s: %w", info.ID, tempDest, dest, err) } if merr != nil { log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr) } - return dest, ID(storageID), url, nil + return url, nil } } - return "", "", "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr) + return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr) } func (r *Remote) fetch(ctx context.Context, url, outname string) error { From 77e4adb5567e31b8ad164a1df6de36bc2eb96466 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 5 Aug 2020 22:11:53 +0200 Subject: [PATCH 2/2] gofmt --- selector_existing.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/selector_existing.go b/selector_existing.go index 20cb1b209..a11c39007 100644 --- a/selector_existing.go +++ b/selector_existing.go @@ -12,17 +12,17 @@ import ( ) type existingSelector struct { - index stores.SectorIndex - sector abi.SectorID - alloc stores.SectorFileType + index stores.SectorIndex + sector abi.SectorID + alloc stores.SectorFileType allowFetch bool } func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) *existingSelector { return &existingSelector{ - index: index, - sector: sector, - alloc: alloc, + index: index, + sector: sector, + alloc: alloc, allowFetch: allowFetch, } }