Merge pull request #90 from filecoin-project/feat/fetch-reserve-space
remote: Fetch storage reservation
This commit is contained in:
commit
b7dca7fb4e
@ -1,6 +1,7 @@
|
||||
package fsutil
|
||||
|
||||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
@ -14,12 +15,15 @@ type SizeInfo struct {
|
||||
func FileSize(path string) (SizeInfo, error) {
|
||||
var stat syscall.Stat_t
|
||||
if err := syscall.Stat(path, &stat); err != nil {
|
||||
if err == syscall.ENOENT {
|
||||
return SizeInfo{}, os.ErrNotExist
|
||||
}
|
||||
return SizeInfo{}, xerrors.Errorf("stat: %w", err)
|
||||
}
|
||||
|
||||
// NOTE: stat.Blocks is in 512B blocks, NOT in stat.Blksize
|
||||
// See https://www.gnu.org/software/libc/manual/html_node/Attribute-Meanings.html
|
||||
return SizeInfo{
|
||||
int64(stat.Blocks) * 512,
|
||||
int64(stat.Blocks) * 512, // NOTE: int64 cast is needed on osx
|
||||
}, nil
|
||||
}
|
||||
|
@ -12,17 +12,17 @@ import (
|
||||
)
|
||||
|
||||
type existingSelector struct {
|
||||
index stores.SectorIndex
|
||||
sector abi.SectorID
|
||||
alloc stores.SectorFileType
|
||||
index stores.SectorIndex
|
||||
sector abi.SectorID
|
||||
alloc stores.SectorFileType
|
||||
allowFetch bool
|
||||
}
|
||||
|
||||
func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) *existingSelector {
|
||||
return &existingSelector{
|
||||
index: index,
|
||||
sector: sector,
|
||||
alloc: alloc,
|
||||
index: index,
|
||||
sector: sector,
|
||||
alloc: alloc,
|
||||
allowFetch: allowFetch,
|
||||
}
|
||||
}
|
||||
|
@ -9,15 +9,15 @@ import (
|
||||
type PathType string
|
||||
|
||||
const (
|
||||
PathStorage = "storage"
|
||||
PathSealing = "sealing"
|
||||
PathStorage PathType = "storage"
|
||||
PathSealing PathType = "sealing"
|
||||
)
|
||||
|
||||
type AcquireMode string
|
||||
|
||||
const (
|
||||
AcquireMove = "move"
|
||||
AcquireCopy = "copy"
|
||||
AcquireMove AcquireMode = "move"
|
||||
AcquireCopy AcquireMode = "copy"
|
||||
)
|
||||
|
||||
type Store interface {
|
||||
|
@ -50,7 +50,10 @@ type LocalStorage interface {
|
||||
SetStorage(func(*StorageConfig)) error
|
||||
|
||||
Stat(path string) (fsutil.FsStat, error)
|
||||
DiskUsage(path string) (int64, error) // returns real disk usage for a file/directory
|
||||
|
||||
// returns real disk usage for a file/directory
|
||||
// os.ErrNotExit when file doesn't exist
|
||||
DiskUsage(path string) (int64, error)
|
||||
}
|
||||
|
||||
const MetaFile = "sectorstore.json"
|
||||
@ -77,7 +80,7 @@ type path struct {
|
||||
func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
|
||||
stat, err := ls.Stat(p.local)
|
||||
if err != nil {
|
||||
return fsutil.FsStat{}, err
|
||||
return fsutil.FsStat{}, xerrors.Errorf("stat %s: %w", p.local, err)
|
||||
}
|
||||
|
||||
stat.Reserved = p.reserved
|
||||
@ -88,7 +91,17 @@ func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
used, err := ls.DiskUsage(p.sectorPath(id, fileType))
|
||||
sp := p.sectorPath(id, fileType)
|
||||
|
||||
used, err := ls.DiskUsage(sp)
|
||||
if err == os.ErrNotExist {
|
||||
p, ferr := tempFetchDest(sp, false)
|
||||
if ferr != nil {
|
||||
return fsutil.FsStat{}, ferr
|
||||
}
|
||||
|
||||
used, err = ls.DiskUsage(p)
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorf("getting disk usage of '%s': %+v", p.sectorPath(id, fileType), err)
|
||||
continue
|
||||
@ -279,7 +292,7 @@ func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.Register
|
||||
|
||||
stat, err := p.stat(st.localStorage)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, xerrors.Errorf("getting local storage stat: %w", err)
|
||||
}
|
||||
|
||||
overhead := int64(overheadTab[fileType]) * int64(ssize) / FSOverheadDen
|
||||
|
@ -95,6 +95,33 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
|
||||
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("local acquire error: %w", err)
|
||||
}
|
||||
|
||||
var toFetch SectorFileType
|
||||
for _, fileType := range PathTypes {
|
||||
if fileType&existing == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if PathByType(paths, fileType) == "" {
|
||||
toFetch |= fileType
|
||||
}
|
||||
}
|
||||
|
||||
apaths, ids, err := r.local.AcquireSector(ctx, s, spt, FTNone, toFetch, pathType, op)
|
||||
if err != nil {
|
||||
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
||||
}
|
||||
|
||||
odt := FSOverheadSeal
|
||||
if pathType == PathStorage {
|
||||
odt = FsOverheadFinalized
|
||||
}
|
||||
|
||||
releaseStorage, err := r.local.Reserve(ctx, s, spt, toFetch, ids, odt)
|
||||
if err != nil {
|
||||
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err)
|
||||
}
|
||||
defer releaseStorage()
|
||||
|
||||
for _, fileType := range PathTypes {
|
||||
if fileType&existing == 0 {
|
||||
continue
|
||||
@ -104,15 +131,18 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
|
||||
continue
|
||||
}
|
||||
|
||||
ap, storageID, url, err := r.acquireFromRemote(ctx, s, spt, fileType, pathType, op)
|
||||
dest := PathByType(apaths, fileType)
|
||||
storageID := PathByType(ids, fileType)
|
||||
|
||||
url, err := r.acquireFromRemote(ctx, s, fileType, dest)
|
||||
if err != nil {
|
||||
return SectorPaths{}, SectorPaths{}, err
|
||||
}
|
||||
|
||||
SetPathByType(&paths, fileType, ap)
|
||||
SetPathByType(&stores, fileType, string(storageID))
|
||||
SetPathByType(&paths, fileType, dest)
|
||||
SetPathByType(&stores, fileType, storageID)
|
||||
|
||||
if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType, op == AcquireMove); err != nil {
|
||||
if err := r.index.StorageDeclareSector(ctx, ID(storageID), s, fileType, op == AcquireMove); err != nil {
|
||||
log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err)
|
||||
continue
|
||||
}
|
||||
@ -127,49 +157,44 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
|
||||
return paths, stores, nil
|
||||
}
|
||||
|
||||
func tempDest(spath string) (string, error) {
|
||||
func tempFetchDest(spath string, create bool) (string, error) {
|
||||
st, b := filepath.Split(spath)
|
||||
tempdir := filepath.Join(st, FetchTempSubdir)
|
||||
if err := os.MkdirAll(tempdir, 755); err != nil {
|
||||
return "", xerrors.Errorf("creating temp fetch dir: %w", err)
|
||||
if create {
|
||||
if err := os.MkdirAll(tempdir, 0755); err != nil {
|
||||
return "", xerrors.Errorf("creating temp fetch dir: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return filepath.Join(tempdir, b), nil
|
||||
}
|
||||
|
||||
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, fileType SectorFileType, pathType PathType, op AcquireMode) (string, ID, string, error) {
|
||||
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType SectorFileType, dest string) (string, error) {
|
||||
si, err := r.index.StorageFindSector(ctx, s, fileType, false)
|
||||
if err != nil {
|
||||
return "", "", "", err
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(si) == 0 {
|
||||
return "", "", "", xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound)
|
||||
return "", xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound)
|
||||
}
|
||||
|
||||
sort.Slice(si, func(i, j int) bool {
|
||||
return si[i].Weight < si[j].Weight
|
||||
})
|
||||
|
||||
apaths, ids, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, pathType, op)
|
||||
if err != nil {
|
||||
return "", "", "", xerrors.Errorf("allocate local sector for fetching: %w", err)
|
||||
}
|
||||
dest := PathByType(apaths, fileType)
|
||||
storageID := PathByType(ids, fileType)
|
||||
|
||||
var merr error
|
||||
for _, info := range si {
|
||||
// TODO: see what we have local, prefer that
|
||||
|
||||
for _, url := range info.URLs {
|
||||
tempDest, err := tempDest(dest)
|
||||
tempDest, err := tempFetchDest(dest, true)
|
||||
if err != nil {
|
||||
return "", "", "", err
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err := os.RemoveAll(dest); err != nil {
|
||||
return "", "", "", xerrors.Errorf("removing dest: %w", err)
|
||||
return "", xerrors.Errorf("removing dest: %w", err)
|
||||
}
|
||||
|
||||
err = r.fetch(ctx, url, tempDest)
|
||||
@ -179,17 +204,17 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.
|
||||
}
|
||||
|
||||
if err := move(tempDest, dest); err != nil {
|
||||
return "", "", "", xerrors.Errorf("fetch move error (storage %s) %s -> %s: %w", info.ID, tempDest, dest, err)
|
||||
return "", xerrors.Errorf("fetch move error (storage %s) %s -> %s: %w", info.ID, tempDest, dest, err)
|
||||
}
|
||||
|
||||
if merr != nil {
|
||||
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
|
||||
}
|
||||
return dest, ID(storageID), url, nil
|
||||
return url, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", "", "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
||||
return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
||||
}
|
||||
|
||||
func (r *Remote) fetch(ctx context.Context, url, outname string) error {
|
||||
|
Loading…
Reference in New Issue
Block a user