diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go index 0dc62b4cb..416f8e373 100644 --- a/cmd/lotus-seal-worker/sub.go +++ b/cmd/lotus-seal-worker/sub.go @@ -123,142 +123,6 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) return res } -func (w *worker) fetch(typ string, sectorID uint64) error { - outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) - - url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) - log.Infof("Fetch %s %s", typ, url) - - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return xerrors.Errorf("request: %w", err) - } - req.Header = w.auth - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return xerrors.Errorf("do request: %w", err) - } - - defer resp.Body.Close() - - bar := pb.New64(resp.ContentLength) - bar.ShowPercent = true - bar.ShowSpeed = true - bar.Units = pb.U_BYTES - - barreader := bar.NewProxyReader(resp.Body) - - bar.Start() - defer bar.Finish() - - mediatype, p, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) - if err != nil { - return xerrors.Errorf("parse media type: %w", err) - } - - var file files.Node - switch mediatype { - case "multipart/form-data": - mpr := multipart.NewReader(barreader, p["boundary"]) - - file, err = files.NewFileFromPartReader(mpr, mediatype) - if err != nil { - return xerrors.Errorf("call to NewFileFromPartReader failed: %w", err) - } - - case "application/octet-stream": - file = files.NewReaderFile(barreader) - default: - return xerrors.Errorf("unknown content type: '%s'", mediatype) - } - - // WriteTo is unhappy when things exist - if err := os.RemoveAll(outname); err != nil { - return xerrors.Errorf("removing dest: %w", err) - } - - return files.WriteTo(file, outname) -} - -func (w *worker) push(typ string, sectorID uint64) error { - outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) - - stat, err := os.Stat(outname) - if err != nil { - return err - } - - f, err := files.NewSerialFile(outname, false, stat) - if err != nil { - return err - } - - url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) - log.Infof("Push %s %s", typ, url) - - sz, err := f.Size() - if err != nil { - return xerrors.Errorf("getting size: %w", err) - } - - bar := pb.New64(sz) - bar.ShowPercent = true - bar.ShowSpeed = true - bar.Units = pb.U_BYTES - - bar.Start() - defer bar.Finish() - //todo set content size - - header := w.auth - - var r io.Reader - r, file := f.(files.File) - if !file { - mfr := files.NewMultiFileReader(f.(files.Directory), true) - - header.Set("Content-Type", "multipart/form-data; boundary="+mfr.Boundary()) - r = mfr - } else { - header.Set("Content-Type", "application/octet-stream") - } - - req, err := http.NewRequest("PUT", url, bar.NewProxyReader(r)) - if err != nil { - return err - } - req.Header = header - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - if resp.StatusCode != 200 { - return xerrors.Errorf("non-200 response: %d", resp.StatusCode) - } - - return resp.Body.Close() -} - -func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error { - var err error - switch typ { - case sectorbuilder.WorkerPreCommit: - err = w.fetch("staged", sectorID) - case sectorbuilder.WorkerCommit: - err = w.fetch("sealed", sectorID) - if err != nil { - return xerrors.Errorf("fetch sealed: %w", err) - } - err = w.fetch("cache", sectorID) - } - if err != nil { - return xerrors.Errorf("fetch failed: %w", err) - } - return nil -} - func errRes(err error) sectorbuilder.SealRes { return sectorbuilder.SealRes{Err: err.Error(), GoErr: err} } diff --git a/cmd/lotus-seal-worker/transfer.go b/cmd/lotus-seal-worker/transfer.go new file mode 100644 index 000000000..b363e2324 --- /dev/null +++ b/cmd/lotus-seal-worker/transfer.go @@ -0,0 +1,141 @@ +package main + +import ( + "io" + "mime" + "net/http" + "os" + + files "github.com/ipfs/go-ipfs-files" + "golang.org/x/xerrors" + "gopkg.in/cheggaaa/pb.v1" + "path/filepath" + + "github.com/filecoin-project/lotus/lib/sectorbuilder" + "github.com/filecoin-project/lotus/lib/systar" +) + +func (w *worker) fetch(typ string, sectorID uint64) error { + outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) + + url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) + log.Infof("Fetch %s %s", typ, url) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return xerrors.Errorf("request: %w", err) + } + req.Header = w.auth + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return xerrors.Errorf("do request: %w", err) + } + + defer resp.Body.Close() + + bar := pb.New64(resp.ContentLength) + bar.ShowPercent = true + bar.ShowSpeed = true + bar.Units = pb.U_BYTES + + barreader := bar.NewProxyReader(resp.Body) + + bar.Start() + defer bar.Finish() + + mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) + if err != nil { + return xerrors.Errorf("parse media type: %w", err) + } + + // WriteTo is unhappy when things exist + if err := os.RemoveAll(outname); err != nil { + return xerrors.Errorf("removing dest: %w", err) + } + + switch mediatype { + case "application/x-tar": + return systar.ExtractTar(barreader, outname) + case "application/octet-stream": + return files.WriteTo(files.NewReaderFile(barreader), outname) + default: + return xerrors.Errorf("unknown content type: '%s'", mediatype) + } + +} + +func (w *worker) push(typ string, sectorID uint64) error { + filename := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) + + url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) + log.Infof("Push %s %s", typ, url) + + stat, err := os.Stat(filename) + if err != nil { + return err + } + + var r io.Reader + if stat.IsDir() { + r, err = systar.TarDirectory(filename) + } else { + r, err = os.OpenFile(filename, os.O_RDONLY, 0644) + } + if err != nil { + return xerrors.Errorf("opening push reader: %w", err) + } + + bar := pb.New64(0) + bar.ShowPercent = true + bar.ShowSpeed = true + bar.ShowCounters = true + bar.Units = pb.U_BYTES + + bar.Start() + defer bar.Finish() + //todo set content size + + header := w.auth + + + if stat.IsDir() { + header.Set("Content-Type", "application/x-tar") + } else { + header.Set("Content-Type", "application/octet-stream") + } + + req, err := http.NewRequest("PUT", url, bar.NewProxyReader(r)) + if err != nil { + return err + } + req.Header = header + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + if resp.StatusCode != 200 { + return xerrors.Errorf("non-200 response: %d", resp.StatusCode) + } + + return resp.Body.Close() +} + +func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error { + var err error + switch typ { + case sectorbuilder.WorkerPreCommit: + err = w.fetch("staged", sectorID) + case sectorbuilder.WorkerCommit: + err = w.fetch("sealed", sectorID) + if err != nil { + return xerrors.Errorf("fetch sealed: %w", err) + } + err = w.fetch("cache", sectorID) + } + if err != nil { + return xerrors.Errorf("fetch failed: %w", err) + } + return nil +} diff --git a/extern/filecoin-ffi b/extern/filecoin-ffi index 9faf00cb5..ebb3e13ad 160000 --- a/extern/filecoin-ffi +++ b/extern/filecoin-ffi @@ -1 +1 @@ -Subproject commit 9faf00cb536fd86559440a09de9131520ae1ca0e +Subproject commit ebb3e13addf13059658ba92e84c9ce4300fbdf25 diff --git a/lib/systar/systar.go b/lib/systar/systar.go new file mode 100644 index 000000000..1fb42d108 --- /dev/null +++ b/lib/systar/systar.go @@ -0,0 +1,49 @@ +package systar + +import ( + "io" + "os/exec" + "path/filepath" +) + +func ExtractTar(body io.Reader, dest string) error { + cmd := exec.Command("tar", "-xS", "-C", dest) + cmd.Stdin = body + return cmd.Run() +} + +func TarDirectory(file string) (io.ReadCloser, error) { + // use system builtin tar, golang one doesn't support sparse files + + dir := filepath.Dir(file) + base := filepath.Base(file) + + i, o := io.Pipe() + + // don't bother with compression, it's mostly random data + cmd := exec.Command("tar", "-cSf", "-", "-C", dir, base) + cmd.Stdout = o + if err := cmd.Start(); err != nil { + return nil, err + } + + return &struct { + io.Reader + io.Closer + }{ + Reader: i, + Closer: closer(func() error { + e1 := i.Close() + if err := cmd.Wait(); err != nil { + return err + } + + return e1 + }), + }, nil +} + +type closer func() error +func (cl closer) Close() error { + return cl() +} diff --git a/node/impl/storminer.go b/node/impl/storminer.go index dd4c6886b..5e413c805 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -6,6 +6,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/lib/sectorbuilder" + "github.com/filecoin-project/lotus/lib/systar" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" @@ -63,24 +64,20 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques return } - f, err := files.NewSerialFile(path, false, stat) + var rd io.Reader + if stat.IsDir() { + rd, err = systar.TarDirectory(path) + w.Header().Set("Content-Type", "application/x-tar") + } else { + rd, err = os.OpenFile(path, os.O_RDONLY, 0644) + w.Header().Set("Content-Type", "application/octet-stream") + } if err != nil { log.Error(err) w.WriteHeader(500) return } - var rd io.Reader - rd, file := f.(files.File) - if !file { - mfr := files.NewMultiFileReader(f.(files.Directory), true) - - w.Header().Set("Content-Type", "multipart/form-data; boundary="+mfr.Boundary()) - rd = mfr - } else { - w.Header().Set("Content-Type", "application/octet-stream") - } - w.WriteHeader(200) if _, err := io.Copy(w, rd); err != nil { log.Error(err) @@ -98,8 +95,6 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques return } - var file files.Node - mediatype, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { log.Error(err) @@ -107,37 +102,23 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques return } - switch mediatype { - case "multipart/form-data": - mpr, err := r.MultipartReader() - if err != nil { - log.Error(err) - w.WriteHeader(500) - return - } - - file, err = files.NewFileFromPartReader(mpr, mediatype) - if err != nil { - log.Error(err) - w.WriteHeader(500) - return - } - - default: - file = files.NewReaderFile(r.Body) - } - - // WriteTo is unhappy when things exist (also cleans up cache after Commit) if err := os.RemoveAll(path); err != nil { log.Error(err) w.WriteHeader(500) return } - if err := files.WriteTo(file, path); err != nil { - log.Error(err) - w.WriteHeader(500) - return + switch mediatype { + case "application/x-tar": + if err := systar.ExtractTar(r.Body, path); err != nil { + return + } + default: + if err := files.WriteTo(files.NewReaderFile(r.Body), path); err != nil { + log.Error(err) + w.WriteHeader(500) + return + } } w.WriteHeader(200)