Merge branch 'iosize' of github.com:acautman/lotus into acautman-iosize
This commit is contained in:
commit
53a6c4eac6
87
extern/sector-storage/stores/remote.go
vendored
87
extern/sector-storage/stores/remote.go
vendored
@ -3,6 +3,8 @@ package stores
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/bits"
|
"math/bits"
|
||||||
"mime"
|
"mime"
|
||||||
@ -59,6 +61,30 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var SealProofIosizes = map[abi.RegisteredSealProof]uint64{
|
||||||
|
abi.RegisteredSealProof_StackedDrg2KiBV1: 32 * 1024,
|
||||||
|
abi.RegisteredSealProof_StackedDrg8MiBV1: 32 * 1024,
|
||||||
|
abi.RegisteredSealProof_StackedDrg512MiBV1: 1024 * 1024,
|
||||||
|
abi.RegisteredSealProof_StackedDrg32GiBV1: 1024 * 1024,
|
||||||
|
abi.RegisteredSealProof_StackedDrg64GiBV1: 1024 * 1024,
|
||||||
|
|
||||||
|
abi.RegisteredSealProof_StackedDrg2KiBV1_1: 32 * 1024,
|
||||||
|
abi.RegisteredSealProof_StackedDrg8MiBV1_1: 32 * 1024,
|
||||||
|
abi.RegisteredSealProof_StackedDrg512MiBV1_1: 1024 * 1024,
|
||||||
|
abi.RegisteredSealProof_StackedDrg32GiBV1_1: 1024 * 1024,
|
||||||
|
abi.RegisteredSealProof_StackedDrg64GiBV1_1: 1024 * 1024,
|
||||||
|
}
|
||||||
|
|
||||||
|
const MaxIoSize = 64 * 1024 * 1024
|
||||||
|
|
||||||
|
func GetIoSizeByProofType(p abi.RegisteredSealProof) (uint64, error) {
|
||||||
|
size, ok := SealProofIosizes[p]
|
||||||
|
if !ok {
|
||||||
|
return 0, xerrors.Errorf("unsupported proof type: %v", p)
|
||||||
|
}
|
||||||
|
return size, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
|
func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
|
||||||
if existing|allocate != existing^allocate {
|
if existing|allocate != existing^allocate {
|
||||||
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
|
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
|
||||||
@ -123,6 +149,12 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
|
|||||||
}
|
}
|
||||||
defer releaseStorage()
|
defer releaseStorage()
|
||||||
|
|
||||||
|
var iosize uint64 = 0
|
||||||
|
ssize, err := GetIoSizeByProofType(s.ProofType)
|
||||||
|
if err == nil {
|
||||||
|
iosize = ssize
|
||||||
|
}
|
||||||
|
|
||||||
for _, fileType := range storiface.PathTypes {
|
for _, fileType := range storiface.PathTypes {
|
||||||
if fileType&existing == 0 {
|
if fileType&existing == 0 {
|
||||||
continue
|
continue
|
||||||
@ -135,7 +167,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
|
|||||||
dest := storiface.PathByType(apaths, fileType)
|
dest := storiface.PathByType(apaths, fileType)
|
||||||
storageID := storiface.PathByType(ids, fileType)
|
storageID := storiface.PathByType(ids, fileType)
|
||||||
|
|
||||||
url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest)
|
url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest, iosize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storiface.SectorPaths{}, storiface.SectorPaths{}, err
|
return storiface.SectorPaths{}, storiface.SectorPaths{}, err
|
||||||
}
|
}
|
||||||
@ -170,7 +202,7 @@ func tempFetchDest(spath string, create bool) (string, error) {
|
|||||||
return filepath.Join(tempdir, b), nil
|
return filepath.Join(tempdir, b), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType storiface.SectorFileType, dest string) (string, error) {
|
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType storiface.SectorFileType, dest string, iosize uint64) (string, error) {
|
||||||
si, err := r.index.StorageFindSector(ctx, s, fileType, 0, false)
|
si, err := r.index.StorageFindSector(ctx, s, fileType, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
@ -198,7 +230,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
|||||||
return "", xerrors.Errorf("removing dest: %w", err)
|
return "", xerrors.Errorf("removing dest: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = r.fetch(ctx, url, tempDest)
|
err = r.fetch(ctx, url, tempDest, iosize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, tempDest, err))
|
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, tempDest, err))
|
||||||
continue
|
continue
|
||||||
@ -218,7 +250,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
|||||||
return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) fetch(ctx context.Context, url, outname string) error {
|
func (r *Remote) fetch(ctx context.Context, url, outname string, iosize uint64) error {
|
||||||
log.Infof("Fetch %s -> %s", url, outname)
|
log.Infof("Fetch %s -> %s", url, outname)
|
||||||
|
|
||||||
if len(r.limit) >= cap(r.limit) {
|
if len(r.limit) >= cap(r.limit) {
|
||||||
@ -276,12 +308,57 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
|
|||||||
case "application/x-tar":
|
case "application/x-tar":
|
||||||
return tarutil.ExtractTar(resp.Body, outname)
|
return tarutil.ExtractTar(resp.Body, outname)
|
||||||
case "application/octet-stream":
|
case "application/octet-stream":
|
||||||
return files.WriteTo(files.NewReaderFile(resp.Body), outname)
|
return WriteWithSize(files.NewReaderFile(resp.Body), outname, iosize)
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("unknown content type: '%s'", mediatype)
|
return xerrors.Errorf("unknown content type: '%s'", mediatype)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WriteWithSize(nd files.Node, fpath string, size uint64) error {
|
||||||
|
switch nd := nd.(type) {
|
||||||
|
case *files.Symlink:
|
||||||
|
return os.Symlink(nd.Target, fpath)
|
||||||
|
case files.File:
|
||||||
|
f, err := os.Create(fpath)
|
||||||
|
defer func() {
|
||||||
|
err = f.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to close file:%v, err:%v", fpath, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if size <= 0 || size > MaxIoSize {
|
||||||
|
_, err = io.Copy(f, nd)
|
||||||
|
} else {
|
||||||
|
buf := make([]byte, size)
|
||||||
|
_, err = io.CopyBuffer(f, nd, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
case files.Directory:
|
||||||
|
err := os.Mkdir(fpath, 0777)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := nd.Entries()
|
||||||
|
for entries.Next() {
|
||||||
|
child := filepath.Join(fpath, entries.Name())
|
||||||
|
if err := WriteWithSize(entries.Node(), child, size); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return entries.Err()
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("file type %T at %q is not supported", nd, fpath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Remote) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error {
|
func (r *Remote) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error {
|
||||||
// Make sure we have the data local
|
// Make sure we have the data local
|
||||||
_, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
|
_, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
|
||||||
|
Loading…
Reference in New Issue
Block a user