stores: Simplify remote copy buf size logic

This commit is contained in:
Łukasz Magiera 2020-12-10 20:51:50 +01:00
parent 53a6c4eac6
commit 46a5013dc1

View File

@ -3,7 +3,6 @@ package stores
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math/bits" "math/bits"
@ -24,12 +23,13 @@ import (
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
files "github.com/ipfs/go-ipfs-files"
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
var FetchTempSubdir = "fetching" var FetchTempSubdir = "fetching"
var CopyBuf = 1 << 20
type Remote struct { type Remote struct {
local *Local local *Local
index SectorIndex index SectorIndex
@ -61,30 +61,6 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int
} }
} }
var SealProofIosizes = map[abi.RegisteredSealProof]uint64{
abi.RegisteredSealProof_StackedDrg2KiBV1: 32 * 1024,
abi.RegisteredSealProof_StackedDrg8MiBV1: 32 * 1024,
abi.RegisteredSealProof_StackedDrg512MiBV1: 1024 * 1024,
abi.RegisteredSealProof_StackedDrg32GiBV1: 1024 * 1024,
abi.RegisteredSealProof_StackedDrg64GiBV1: 1024 * 1024,
abi.RegisteredSealProof_StackedDrg2KiBV1_1: 32 * 1024,
abi.RegisteredSealProof_StackedDrg8MiBV1_1: 32 * 1024,
abi.RegisteredSealProof_StackedDrg512MiBV1_1: 1024 * 1024,
abi.RegisteredSealProof_StackedDrg32GiBV1_1: 1024 * 1024,
abi.RegisteredSealProof_StackedDrg64GiBV1_1: 1024 * 1024,
}
const MaxIoSize = 64 * 1024 * 1024
func GetIoSizeByProofType(p abi.RegisteredSealProof) (uint64, error) {
size, ok := SealProofIosizes[p]
if !ok {
return 0, xerrors.Errorf("unsupported proof type: %v", p)
}
return size, nil
}
func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) { func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
if existing|allocate != existing^allocate { if existing|allocate != existing^allocate {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector") return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
@ -149,12 +125,6 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
} }
defer releaseStorage() defer releaseStorage()
var iosize uint64 = 0
ssize, err := GetIoSizeByProofType(s.ProofType)
if err == nil {
iosize = ssize
}
for _, fileType := range storiface.PathTypes { for _, fileType := range storiface.PathTypes {
if fileType&existing == 0 { if fileType&existing == 0 {
continue continue
@ -167,7 +137,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
dest := storiface.PathByType(apaths, fileType) dest := storiface.PathByType(apaths, fileType)
storageID := storiface.PathByType(ids, fileType) storageID := storiface.PathByType(ids, fileType)
url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest, iosize) url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest)
if err != nil { if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, err return storiface.SectorPaths{}, storiface.SectorPaths{}, err
} }
@ -202,7 +172,7 @@ func tempFetchDest(spath string, create bool) (string, error) {
return filepath.Join(tempdir, b), nil return filepath.Join(tempdir, b), nil
} }
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType storiface.SectorFileType, dest string, iosize uint64) (string, error) { func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType storiface.SectorFileType, dest string) (string, error) {
si, err := r.index.StorageFindSector(ctx, s, fileType, 0, false) si, err := r.index.StorageFindSector(ctx, s, fileType, 0, false)
if err != nil { if err != nil {
return "", err return "", err
@ -230,7 +200,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
return "", xerrors.Errorf("removing dest: %w", err) return "", xerrors.Errorf("removing dest: %w", err)
} }
err = r.fetch(ctx, url, tempDest, iosize) err = r.fetch(ctx, url, tempDest)
if err != nil { if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, tempDest, err)) merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, tempDest, err))
continue continue
@ -250,7 +220,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr) return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
} }
func (r *Remote) fetch(ctx context.Context, url, outname string, iosize uint64) error { func (r *Remote) fetch(ctx context.Context, url, outname string) error {
log.Infof("Fetch %s -> %s", url, outname) log.Infof("Fetch %s -> %s", url, outname)
if len(r.limit) >= cap(r.limit) { if len(r.limit) >= cap(r.limit) {
@ -308,57 +278,21 @@ func (r *Remote) fetch(ctx context.Context, url, outname string, iosize uint64)
case "application/x-tar": case "application/x-tar":
return tarutil.ExtractTar(resp.Body, outname) return tarutil.ExtractTar(resp.Body, outname)
case "application/octet-stream": case "application/octet-stream":
return WriteWithSize(files.NewReaderFile(resp.Body), outname, iosize) f, err := os.Create(outname)
if err != nil {
return err
}
_, err = io.CopyBuffer(f, resp.Body, make([]byte, CopyBuf))
if err != nil {
f.Close() // nolint
return err
}
return f.Close()
default: default:
return xerrors.Errorf("unknown content type: '%s'", mediatype) return xerrors.Errorf("unknown content type: '%s'", mediatype)
} }
} }
func WriteWithSize(nd files.Node, fpath string, size uint64) error {
switch nd := nd.(type) {
case *files.Symlink:
return os.Symlink(nd.Target, fpath)
case files.File:
f, err := os.Create(fpath)
defer func() {
err = f.Close()
if err != nil {
log.Warnf("failed to close file:%v, err:%v", fpath, err)
}
}()
if err != nil {
return err
}
if size <= 0 || size > MaxIoSize {
_, err = io.Copy(f, nd)
} else {
buf := make([]byte, size)
_, err = io.CopyBuffer(f, nd, buf)
}
if err != nil {
return err
}
return nil
case files.Directory:
err := os.Mkdir(fpath, 0777)
if err != nil {
return err
}
entries := nd.Entries()
for entries.Next() {
child := filepath.Join(fpath, entries.Name())
if err := WriteWithSize(entries.Node(), child, size); err != nil {
return err
}
}
return entries.Err()
default:
return fmt.Errorf("file type %T at %q is not supported", nd, fpath)
}
}
func (r *Remote) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error { func (r *Remote) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error {
// Make sure we have the data local // Make sure we have the data local
_, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) _, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)