sector import: Sector data download
This commit is contained in:
parent
fbb487ae2b
commit
ea99bd9763
Binary file not shown.
Binary file not shown.
104
storage/paths/fetch.go
Normal file
104
storage/paths/fetch.go
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
package paths
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"mime"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-multierror"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer/tarutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func fetch(ctx context.Context, url, outname string, header http.Header) (rerr error) {
|
||||||
|
log.Infof("Fetch %s -> %s", url, outname)
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("request: %w", err)
|
||||||
|
}
|
||||||
|
req.Header = header
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("do request: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close() // nolint
|
||||||
|
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
var bytes int64
|
||||||
|
defer func() {
|
||||||
|
took := time.Now().Sub(start)
|
||||||
|
mibps := float64(bytes) / 1024 / 1024 * float64(time.Second) / float64(took)
|
||||||
|
log.Infow("Fetch done", "url", url, "out", outname, "took", took.Round(time.Millisecond), "bytes", bytes, "MiB/s", mibps, "err", rerr)
|
||||||
|
}()
|
||||||
|
|
||||||
|
mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("parse media type: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.RemoveAll(outname); err != nil {
|
||||||
|
return xerrors.Errorf("removing dest: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch mediatype {
|
||||||
|
case "application/x-tar":
|
||||||
|
bytes, err = tarutil.ExtractTar(resp.Body, outname, make([]byte, CopyBuf))
|
||||||
|
return err
|
||||||
|
case "application/octet-stream":
|
||||||
|
f, err := os.Create(outname)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bytes, err = io.CopyBuffer(f, resp.Body, make([]byte, CopyBuf))
|
||||||
|
if err != nil {
|
||||||
|
f.Close() // nolint
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return f.Close()
|
||||||
|
default:
|
||||||
|
return xerrors.Errorf("unknown content type: '%s'", mediatype)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchWithTemp fetches data into a temp 'fetching' directory, then moves the file to destination
|
||||||
|
func FetchWithTemp(ctx context.Context, urls []string, dest string, header http.Header) (string, error) {
|
||||||
|
var merr error
|
||||||
|
for _, url := range urls {
|
||||||
|
tempDest, err := tempFetchDest(dest, true)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.RemoveAll(dest); err != nil {
|
||||||
|
return "", xerrors.Errorf("removing dest: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = fetch(ctx, url, tempDest, header)
|
||||||
|
if err != nil {
|
||||||
|
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s -> %s: %w", url, tempDest, err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := move(tempDest, dest); err != nil {
|
||||||
|
return "", xerrors.Errorf("fetch move error %s -> %s: %w", tempDest, dest, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if merr != nil {
|
||||||
|
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
|
||||||
|
}
|
||||||
|
return url, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", xerrors.Errorf("failed to fetch sector file (tried %v): %w", urls, merr)
|
||||||
|
}
|
@ -7,7 +7,6 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/bits"
|
"math/bits"
|
||||||
"mime"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
@ -24,7 +23,6 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/tarutil"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var FetchTempSubdir = "fetching"
|
var FetchTempSubdir = "fetching"
|
||||||
@ -236,7 +234,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
|||||||
return "", xerrors.Errorf("removing dest: %w", err)
|
return "", xerrors.Errorf("removing dest: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = r.fetch(ctx, url, tempDest)
|
err = r.fetchThrottled(ctx, url, tempDest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, tempDest, err))
|
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, tempDest, err))
|
||||||
continue
|
continue
|
||||||
@ -256,9 +254,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
|||||||
return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) fetch(ctx context.Context, url, outname string) error {
|
func (r *Remote) fetchThrottled(ctx context.Context, url, outname string) (rerr error) {
|
||||||
log.Infof("Fetch %s -> %s", url, outname)
|
|
||||||
|
|
||||||
if len(r.limit) >= cap(r.limit) {
|
if len(r.limit) >= cap(r.limit) {
|
||||||
log.Infof("Throttling fetch, %d already running", len(r.limit))
|
log.Infof("Throttling fetch, %d already running", len(r.limit))
|
||||||
}
|
}
|
||||||
@ -274,59 +270,7 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
|
|||||||
return xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err())
|
return xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", url, nil)
|
return fetch(ctx, url, outname, r.auth)
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("request: %w", err)
|
|
||||||
}
|
|
||||||
req.Header = r.auth
|
|
||||||
req = req.WithContext(ctx)
|
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("do request: %w", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close() // nolint
|
|
||||||
|
|
||||||
if resp.StatusCode != 200 {
|
|
||||||
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
/*bar := pb.New64(w.sizeForType(typ))
|
|
||||||
bar.ShowPercent = true
|
|
||||||
bar.ShowSpeed = true
|
|
||||||
bar.Units = pb.U_BYTES
|
|
||||||
|
|
||||||
barreader := bar.NewProxyReader(resp.Body)
|
|
||||||
|
|
||||||
bar.Start()
|
|
||||||
defer bar.Finish()*/
|
|
||||||
|
|
||||||
mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("parse media type: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := os.RemoveAll(outname); err != nil {
|
|
||||||
return xerrors.Errorf("removing dest: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
switch mediatype {
|
|
||||||
case "application/x-tar":
|
|
||||||
return tarutil.ExtractTar(resp.Body, outname, make([]byte, CopyBuf))
|
|
||||||
case "application/octet-stream":
|
|
||||||
f, err := os.Create(outname)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, err = io.CopyBuffer(f, resp.Body, make([]byte, CopyBuf))
|
|
||||||
if err != nil {
|
|
||||||
f.Close() // nolint
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return f.Close()
|
|
||||||
default:
|
|
||||||
return xerrors.Errorf("unknown content type: '%s'", mediatype)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.RegisteredSealProof, offset, size abi.PaddedPieceSize) (bool, error) {
|
func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.RegisteredSealProof, offset, size abi.PaddedPieceSize) (bool, error) {
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/filecoin-project/go-state-types/proof"
|
"github.com/filecoin-project/go-state-types/proof"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/lib/nullreader"
|
"github.com/filecoin-project/lotus/lib/nullreader"
|
||||||
|
spaths "github.com/filecoin-project/lotus/storage/paths"
|
||||||
nr "github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader"
|
nr "github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader"
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/fr32"
|
"github.com/filecoin-project/lotus/storage/sealer/fr32"
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/partialfile"
|
"github.com/filecoin-project/lotus/storage/sealer/partialfile"
|
||||||
@ -1128,7 +1129,36 @@ func (sb *Sealer) Remove(ctx context.Context, sector storiface.SectorRef) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sb *Sealer) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) error {
|
func (sb *Sealer) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) error {
|
||||||
panic("todo")
|
var todo storiface.SectorFileType
|
||||||
|
for fileType := range src {
|
||||||
|
todo |= fileType
|
||||||
|
}
|
||||||
|
|
||||||
|
ptype := storiface.PathSealing
|
||||||
|
if finalized {
|
||||||
|
ptype = storiface.PathStorage
|
||||||
|
}
|
||||||
|
|
||||||
|
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, todo, ptype)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to acquire sector paths: %w", err)
|
||||||
|
}
|
||||||
|
defer done()
|
||||||
|
|
||||||
|
for fileType, data := range src {
|
||||||
|
out := storiface.PathByType(paths, fileType)
|
||||||
|
|
||||||
|
if data.Local {
|
||||||
|
return xerrors.Errorf("sector(%v) with local data (%#v) requested in DownloadSectorData", sector, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := spaths.FetchWithTemp(ctx, []string{data.URL}, out, data.Headers)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("downloading sector data: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetRequiredPadding(oldLength abi.PaddedPieceSize, newPieceLength abi.PaddedPieceSize) ([]abi.PaddedPieceSize, abi.PaddedPieceSize) {
|
func GetRequiredPadding(oldLength abi.PaddedPieceSize, newPieceLength abi.PaddedPieceSize) ([]abi.PaddedPieceSize, abi.PaddedPieceSize) {
|
||||||
|
@ -12,19 +12,20 @@ import (
|
|||||||
|
|
||||||
var log = logging.Logger("tarutil") // nolint
|
var log = logging.Logger("tarutil") // nolint
|
||||||
|
|
||||||
func ExtractTar(body io.Reader, dir string, buf []byte) error {
|
func ExtractTar(body io.Reader, dir string, buf []byte) (int64, error) {
|
||||||
if err := os.MkdirAll(dir, 0755); err != nil { // nolint
|
if err := os.MkdirAll(dir, 0755); err != nil { // nolint
|
||||||
return xerrors.Errorf("mkdir: %w", err)
|
return 0, xerrors.Errorf("mkdir: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tr := tar.NewReader(body)
|
tr := tar.NewReader(body)
|
||||||
|
var read int64
|
||||||
for {
|
for {
|
||||||
header, err := tr.Next()
|
header, err := tr.Next()
|
||||||
switch err {
|
switch err {
|
||||||
default:
|
default:
|
||||||
return err
|
return read, err
|
||||||
case io.EOF:
|
case io.EOF:
|
||||||
return nil
|
return read, nil
|
||||||
|
|
||||||
case nil:
|
case nil:
|
||||||
}
|
}
|
||||||
@ -33,17 +34,20 @@ func ExtractTar(body io.Reader, dir string, buf []byte) error {
|
|||||||
f, err := os.Create(filepath.Join(dir, header.Name))
|
f, err := os.Create(filepath.Join(dir, header.Name))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//nolint:gosec
|
//nolint:gosec
|
||||||
return xerrors.Errorf("creating file %s: %w", filepath.Join(dir, header.Name), err)
|
return read, xerrors.Errorf("creating file %s: %w", filepath.Join(dir, header.Name), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// This data is coming from a trusted source, no need to check the size.
|
// This data is coming from a trusted source, no need to check the size.
|
||||||
|
// TODO: now it's actually not coming from a trusted source, check size / paths
|
||||||
//nolint:gosec
|
//nolint:gosec
|
||||||
if _, err := io.CopyBuffer(f, tr, buf); err != nil {
|
r, err := io.CopyBuffer(f, tr, buf)
|
||||||
return err
|
read += r
|
||||||
|
if err != nil {
|
||||||
|
return read, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := f.Close(); err != nil {
|
if err := f.Close(); err != nil {
|
||||||
return err
|
return read, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user