Merge pull request #66 from filecoin-project/fix/es

Fixes / changes for fast-retrieval support
This commit is contained in:
Łukasz Magiera 2020-07-08 21:51:34 +02:00 committed by GitHub
commit e3b9ba01c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 119 additions and 56 deletions

View File

@ -254,7 +254,10 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
defer sealed.Close()
var at, nextat abi.PaddedPieceSize
for {
first := true
for first || toUnseal.HasNext() {
first = false
piece, err := toUnseal.NextRun()
if err != nil {
return xerrors.Errorf("getting next range to unseal: %w", err)
@ -521,8 +524,7 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
}
}
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, false)
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, stores.PathStorage)
if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err)
}

View File

@ -10,7 +10,6 @@ import (
var log = logging.Logger("fsutil")
func Deallocate(file *os.File, offset int64, length int64) error {
log.Warnf("deallocating space not supported")

25
fsutil/filesize_unix.go Normal file
View File

@ -0,0 +1,25 @@
package fsutil
import (
"syscall"
"golang.org/x/xerrors"
)
type SizeInfo struct {
OnDisk int64
}
// FileSize returns bytes used by a file on disk
func FileSize(path string) (SizeInfo, error) {
var stat syscall.Stat_t
if err := syscall.Stat(path, &stat); err != nil {
return SizeInfo{}, xerrors.Errorf("stat: %w", err)
}
// NOTE: stat.Blocks is in 512B blocks, NOT in stat.Blksize
// See https://www.gnu.org/software/libc/manual/html_node/Attribute-Meanings.html
return SizeInfo{
stat.Blocks * 512,
}, nil
}

7
fsutil/statfs.go Normal file
View File

@ -0,0 +1,7 @@
package fsutil
type FsStat struct {
Capacity int64
Available int64 // Available to use for sector storage
Reserved int64
}

19
fsutil/statfs_unix.go Normal file
View File

@ -0,0 +1,19 @@
package fsutil
import (
"syscall"
"golang.org/x/xerrors"
)
func Statfs(path string) (FsStat, error) {
var stat syscall.Statfs_t
if err := syscall.Statfs(path, &stat); err != nil {
return FsStat{}, xerrors.Errorf("statfs: %w", err)
}
return FsStat{
Capacity: int64(stat.Blocks) * stat.Bsize,
Available: int64(stat.Bavail) * stat.Bsize,
}, nil
}

28
fsutil/statfs_windows.go Normal file
View File

@ -0,0 +1,28 @@
package fsutil
import (
"syscall"
"unsafe"
)
func Statfs(volumePath string) (FsStat, error) {
// From https://github.com/ricochet2200/go-disk-usage/blob/master/du/diskusage_windows.go
h := syscall.MustLoadDLL("kernel32.dll")
c := h.MustFindProc("GetDiskFreeSpaceExW")
var freeBytes int64
var totalBytes int64
var availBytes int64
c.Call(
uintptr(unsafe.Pointer(syscall.StringToUTF16Ptr(volumePath))),
uintptr(unsafe.Pointer(&freeBytes)),
uintptr(unsafe.Pointer(&totalBytes)),
uintptr(unsafe.Pointer(&availBytes)))
return FsStat{
Capacity: totalBytes,
Available: availBytes,
}, nil
}

View File

@ -3,6 +3,7 @@ package sectorstorage
import (
"context"
"errors"
"github.com/filecoin-project/sector-storage/fsutil"
"io"
"net/http"
@ -491,7 +492,7 @@ func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error
return out, nil
}
func (m *Manager) FsStat(ctx context.Context, id stores.ID) (stores.FsStat, error) {
func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error) {
return m.storage.FsStat(ctx, id)
}

View File

@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/filecoin-project/sector-storage/fsutil"
"github.com/filecoin-project/sector-storage/sealtasks"
logging "github.com/ipfs/go-log"
"io/ioutil"
@ -69,8 +70,8 @@ func (t *testStorage) SetStorage(f func(*stores.StorageConfig)) error {
return nil
}
func (t *testStorage) Stat(path string) (stores.FsStat, error) {
return stores.Stat(path)
func (t *testStorage) Stat(path string) (fsutil.FsStat, error) {
return fsutil.Statfs(path)
}
var _ stores.LocalStorage = &testStorage{}

View File

@ -320,7 +320,7 @@ func (mgr *SectorMgr) FinalizeSector(context.Context, abi.SectorID, []storage.Ra
}
func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
panic("implement me")
return nil
}
func (mgr *SectorMgr) Remove(ctx context.Context, sector abi.SectorID) error {

View File

@ -2,6 +2,7 @@ package stores
import (
"context"
"github.com/filecoin-project/sector-storage/fsutil"
"net/url"
gopath "path"
"sort"
@ -34,7 +35,7 @@ type StorageInfo struct {
}
type HealthReport struct {
Stat FsStat
Stat fsutil.FsStat
Err error
}
@ -50,7 +51,7 @@ type SectorStorageInfo struct {
}
type SectorIndex interface { // part of storage-miner api
StorageAttach(context.Context, StorageInfo, FsStat) error
StorageAttach(context.Context, StorageInfo, fsutil.FsStat) error
StorageInfo(context.Context, ID) (StorageInfo, error)
StorageReportHealth(context.Context, ID, HealthReport) error
@ -77,7 +78,7 @@ type declMeta struct {
type storageEntry struct {
info *StorageInfo
fsi FsStat
fsi fsutil.FsStat
lastHeartbeat time.Time
heartbeatErr error
@ -130,7 +131,7 @@ func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) {
return out, nil
}
func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st FsStat) error {
func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st fsutil.FsStat) error {
i.lk.Lock()
defer i.lk.Unlock()

View File

@ -2,10 +2,7 @@ package stores
import (
"context"
"syscall"
"golang.org/x/xerrors"
"github.com/filecoin-project/sector-storage/fsutil"
"github.com/filecoin-project/specs-actors/actors/abi"
)
@ -34,24 +31,5 @@ type Store interface {
// move sectors into storage
MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, types SectorFileType) error
FsStat(ctx context.Context, id ID) (FsStat, error)
}
func Stat(path string) (FsStat, error) {
var stat syscall.Statfs_t
if err := syscall.Statfs(path, &stat); err != nil {
return FsStat{}, xerrors.Errorf("statfs: %w", err)
}
return FsStat{
Capacity: int64(stat.Blocks) * stat.Bsize,
Available: int64(stat.Bavail) * stat.Bsize,
}, nil
}
type FsStat struct {
Capacity int64
Available int64 // Available to use for sector storage
Used int64
Reserved int64
FsStat(ctx context.Context, id ID) (fsutil.FsStat, error)
}

View File

@ -3,6 +3,7 @@ package stores
import (
"context"
"encoding/json"
"github.com/filecoin-project/sector-storage/fsutil"
"io/ioutil"
"math/bits"
"math/rand"
@ -48,7 +49,7 @@ type LocalStorage interface {
GetStorage() (StorageConfig, error)
SetStorage(func(*StorageConfig)) error
Stat(path string) (FsStat, error)
Stat(path string) (fsutil.FsStat, error)
DiskUsage(path string) (int64, error) // returns real disk usage for a file/directory
}
@ -73,10 +74,10 @@ type path struct {
reservations map[abi.SectorID]SectorFileType
}
func (p *path) stat(ls LocalStorage) (FsStat, error) {
func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
stat, err := ls.Stat(p.local)
if err != nil {
return FsStat{}, err
return fsutil.FsStat{}, err
}
stat.Reserved = p.reserved
@ -557,13 +558,13 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.Regist
var errPathNotFound = xerrors.Errorf("fsstat: path not found")
func (st *Local) FsStat(ctx context.Context, id ID) (FsStat, error) {
func (st *Local) FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) {
st.localLk.RLock()
defer st.localLk.RUnlock()
p, ok := st.paths[id]
if !ok {
return FsStat{}, errPathNotFound
return fsutil.FsStat{}, errPathNotFound
}
return p.stat(st.localStorage)

View File

@ -3,6 +3,7 @@ package stores
import (
"context"
"encoding/json"
"github.com/filecoin-project/sector-storage/fsutil"
"github.com/google/uuid"
"io/ioutil"
"os"
@ -32,11 +33,10 @@ func (t *TestingLocalStorage) SetStorage(f func(*StorageConfig)) error {
return nil
}
func (t *TestingLocalStorage) Stat(path string) (FsStat, error) {
return FsStat{
func (t *TestingLocalStorage) Stat(path string) (fsutil.FsStat, error) {
return fsutil.FsStat{
Capacity: pathSize,
Available: pathSize,
Used: 0,
}, nil
}

View File

@ -3,6 +3,7 @@ package stores
import (
"context"
"encoding/json"
"github.com/filecoin-project/sector-storage/fsutil"
"io/ioutil"
"math/bits"
"mime"
@ -270,7 +271,7 @@ func (r *Remote) deleteFromRemote(ctx context.Context, url string) error {
return nil
}
func (r *Remote) FsStat(ctx context.Context, id ID) (FsStat, error) {
func (r *Remote) FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) {
st, err := r.local.FsStat(ctx, id)
switch err {
case nil:
@ -278,53 +279,53 @@ func (r *Remote) FsStat(ctx context.Context, id ID) (FsStat, error) {
case errPathNotFound:
break
default:
return FsStat{}, xerrors.Errorf("local stat: %w", err)
return fsutil.FsStat{}, xerrors.Errorf("local stat: %w", err)
}
si, err := r.index.StorageInfo(ctx, id)
if err != nil {
return FsStat{}, xerrors.Errorf("getting remote storage info: %w", err)
return fsutil.FsStat{}, xerrors.Errorf("getting remote storage info: %w", err)
}
if len(si.URLs) == 0 {
return FsStat{}, xerrors.Errorf("no known URLs for remote storage %s", id)
return fsutil.FsStat{}, xerrors.Errorf("no known URLs for remote storage %s", id)
}
rl, err := url.Parse(si.URLs[0])
if err != nil {
return FsStat{}, xerrors.Errorf("failed to parse url: %w", err)
return fsutil.FsStat{}, xerrors.Errorf("failed to parse url: %w", err)
}
rl.Path = gopath.Join(rl.Path, "stat", string(id))
req, err := http.NewRequest("GET", rl.String(), nil)
if err != nil {
return FsStat{}, xerrors.Errorf("request: %w", err)
return fsutil.FsStat{}, xerrors.Errorf("request: %w", err)
}
req.Header = r.auth
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return FsStat{}, xerrors.Errorf("do request: %w", err)
return fsutil.FsStat{}, xerrors.Errorf("do request: %w", err)
}
switch resp.StatusCode {
case 200:
break
case 404:
return FsStat{}, errPathNotFound
return fsutil.FsStat{}, errPathNotFound
case 500:
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return FsStat{}, xerrors.Errorf("fsstat: got http 500, then failed to read the error: %w", err)
return fsutil.FsStat{}, xerrors.Errorf("fsstat: got http 500, then failed to read the error: %w", err)
}
return FsStat{}, xerrors.Errorf("fsstat: got http 500: %s", string(b))
return fsutil.FsStat{}, xerrors.Errorf("fsstat: got http 500: %s", string(b))
}
var out FsStat
var out fsutil.FsStat
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return FsStat{}, xerrors.Errorf("decoding fsstat: %w", err)
return fsutil.FsStat{}, xerrors.Errorf("decoding fsstat: %w", err)
}
defer resp.Body.Close()