worker: Use system tar for moving cache around
This commit is contained in:
parent
edd30c7aa1
commit
28dde1a2d3
@ -123,142 +123,6 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
|
|||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *worker) fetch(typ string, sectorID uint64) error {
|
|
||||||
outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
|
|
||||||
|
|
||||||
url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)
|
|
||||||
log.Infof("Fetch %s %s", typ, url)
|
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", url, nil)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("request: %w", err)
|
|
||||||
}
|
|
||||||
req.Header = w.auth
|
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("do request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
bar := pb.New64(resp.ContentLength)
|
|
||||||
bar.ShowPercent = true
|
|
||||||
bar.ShowSpeed = true
|
|
||||||
bar.Units = pb.U_BYTES
|
|
||||||
|
|
||||||
barreader := bar.NewProxyReader(resp.Body)
|
|
||||||
|
|
||||||
bar.Start()
|
|
||||||
defer bar.Finish()
|
|
||||||
|
|
||||||
mediatype, p, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("parse media type: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var file files.Node
|
|
||||||
switch mediatype {
|
|
||||||
case "multipart/form-data":
|
|
||||||
mpr := multipart.NewReader(barreader, p["boundary"])
|
|
||||||
|
|
||||||
file, err = files.NewFileFromPartReader(mpr, mediatype)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("call to NewFileFromPartReader failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
case "application/octet-stream":
|
|
||||||
file = files.NewReaderFile(barreader)
|
|
||||||
default:
|
|
||||||
return xerrors.Errorf("unknown content type: '%s'", mediatype)
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteTo is unhappy when things exist
|
|
||||||
if err := os.RemoveAll(outname); err != nil {
|
|
||||||
return xerrors.Errorf("removing dest: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return files.WriteTo(file, outname)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *worker) push(typ string, sectorID uint64) error {
|
|
||||||
outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
|
|
||||||
|
|
||||||
stat, err := os.Stat(outname)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err := files.NewSerialFile(outname, false, stat)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)
|
|
||||||
log.Infof("Push %s %s", typ, url)
|
|
||||||
|
|
||||||
sz, err := f.Size()
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("getting size: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
bar := pb.New64(sz)
|
|
||||||
bar.ShowPercent = true
|
|
||||||
bar.ShowSpeed = true
|
|
||||||
bar.Units = pb.U_BYTES
|
|
||||||
|
|
||||||
bar.Start()
|
|
||||||
defer bar.Finish()
|
|
||||||
//todo set content size
|
|
||||||
|
|
||||||
header := w.auth
|
|
||||||
|
|
||||||
var r io.Reader
|
|
||||||
r, file := f.(files.File)
|
|
||||||
if !file {
|
|
||||||
mfr := files.NewMultiFileReader(f.(files.Directory), true)
|
|
||||||
|
|
||||||
header.Set("Content-Type", "multipart/form-data; boundary="+mfr.Boundary())
|
|
||||||
r = mfr
|
|
||||||
} else {
|
|
||||||
header.Set("Content-Type", "application/octet-stream")
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := http.NewRequest("PUT", url, bar.NewProxyReader(r))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
req.Header = header
|
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if resp.StatusCode != 200 {
|
|
||||||
return xerrors.Errorf("non-200 response: %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp.Body.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error {
|
|
||||||
var err error
|
|
||||||
switch typ {
|
|
||||||
case sectorbuilder.WorkerPreCommit:
|
|
||||||
err = w.fetch("staged", sectorID)
|
|
||||||
case sectorbuilder.WorkerCommit:
|
|
||||||
err = w.fetch("sealed", sectorID)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("fetch sealed: %w", err)
|
|
||||||
}
|
|
||||||
err = w.fetch("cache", sectorID)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("fetch failed: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func errRes(err error) sectorbuilder.SealRes {
|
func errRes(err error) sectorbuilder.SealRes {
|
||||||
return sectorbuilder.SealRes{Err: err.Error(), GoErr: err}
|
return sectorbuilder.SealRes{Err: err.Error(), GoErr: err}
|
||||||
}
|
}
|
||||||
|
141
cmd/lotus-seal-worker/transfer.go
Normal file
141
cmd/lotus-seal-worker/transfer.go
Normal file
@ -0,0 +1,141 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"mime"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
files "github.com/ipfs/go-ipfs-files"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
"gopkg.in/cheggaaa/pb.v1"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||||
|
"github.com/filecoin-project/lotus/lib/systar"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (w *worker) fetch(typ string, sectorID uint64) error {
|
||||||
|
outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
|
||||||
|
|
||||||
|
url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)
|
||||||
|
log.Infof("Fetch %s %s", typ, url)
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("request: %w", err)
|
||||||
|
}
|
||||||
|
req.Header = w.auth
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("do request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
bar := pb.New64(resp.ContentLength)
|
||||||
|
bar.ShowPercent = true
|
||||||
|
bar.ShowSpeed = true
|
||||||
|
bar.Units = pb.U_BYTES
|
||||||
|
|
||||||
|
barreader := bar.NewProxyReader(resp.Body)
|
||||||
|
|
||||||
|
bar.Start()
|
||||||
|
defer bar.Finish()
|
||||||
|
|
||||||
|
mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("parse media type: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteTo is unhappy when things exist
|
||||||
|
if err := os.RemoveAll(outname); err != nil {
|
||||||
|
return xerrors.Errorf("removing dest: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch mediatype {
|
||||||
|
case "application/x-tar":
|
||||||
|
return systar.ExtractTar(barreader, outname)
|
||||||
|
case "application/octet-stream":
|
||||||
|
return files.WriteTo(files.NewReaderFile(barreader), outname)
|
||||||
|
default:
|
||||||
|
return xerrors.Errorf("unknown content type: '%s'", mediatype)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *worker) push(typ string, sectorID uint64) error {
|
||||||
|
filename := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
|
||||||
|
|
||||||
|
url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)
|
||||||
|
log.Infof("Push %s %s", typ, url)
|
||||||
|
|
||||||
|
stat, err := os.Stat(filename)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var r io.Reader
|
||||||
|
if stat.IsDir() {
|
||||||
|
r, err = systar.TarDirectory(filename)
|
||||||
|
} else {
|
||||||
|
r, err = os.OpenFile(filename, os.O_RDONLY, 0644)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("opening push reader: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bar := pb.New64(0)
|
||||||
|
bar.ShowPercent = true
|
||||||
|
bar.ShowSpeed = true
|
||||||
|
bar.ShowCounters = true
|
||||||
|
bar.Units = pb.U_BYTES
|
||||||
|
|
||||||
|
bar.Start()
|
||||||
|
defer bar.Finish()
|
||||||
|
//todo set content size
|
||||||
|
|
||||||
|
header := w.auth
|
||||||
|
|
||||||
|
|
||||||
|
if stat.IsDir() {
|
||||||
|
header.Set("Content-Type", "application/x-tar")
|
||||||
|
} else {
|
||||||
|
header.Set("Content-Type", "application/octet-stream")
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("PUT", url, bar.NewProxyReader(r))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header = header
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
return xerrors.Errorf("non-200 response: %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.Body.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error {
|
||||||
|
var err error
|
||||||
|
switch typ {
|
||||||
|
case sectorbuilder.WorkerPreCommit:
|
||||||
|
err = w.fetch("staged", sectorID)
|
||||||
|
case sectorbuilder.WorkerCommit:
|
||||||
|
err = w.fetch("sealed", sectorID)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("fetch sealed: %w", err)
|
||||||
|
}
|
||||||
|
err = w.fetch("cache", sectorID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("fetch failed: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
2
extern/filecoin-ffi
vendored
2
extern/filecoin-ffi
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 9faf00cb536fd86559440a09de9131520ae1ca0e
|
Subproject commit ebb3e13addf13059658ba92e84c9ce4300fbdf25
|
49
lib/systar/systar.go
Normal file
49
lib/systar/systar.go
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
package systar
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
)
|
||||||
|
|
||||||
|
func ExtractTar(body io.Reader, dest string) error {
|
||||||
|
cmd := exec.Command("tar", "-xS", "-C", dest)
|
||||||
|
cmd.Stdin = body
|
||||||
|
return cmd.Run()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TarDirectory(file string) (io.ReadCloser, error) {
|
||||||
|
// use system builtin tar, golang one doesn't support sparse files
|
||||||
|
|
||||||
|
dir := filepath.Dir(file)
|
||||||
|
base := filepath.Base(file)
|
||||||
|
|
||||||
|
i, o := io.Pipe()
|
||||||
|
|
||||||
|
// don't bother with compression, it's mostly random data
|
||||||
|
cmd := exec.Command("tar", "-cSf", "-", "-C", dir, base)
|
||||||
|
cmd.Stdout = o
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &struct {
|
||||||
|
io.Reader
|
||||||
|
io.Closer
|
||||||
|
}{
|
||||||
|
Reader: i,
|
||||||
|
Closer: closer(func() error {
|
||||||
|
e1 := i.Close()
|
||||||
|
if err := cmd.Wait(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return e1
|
||||||
|
}),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type closer func() error
|
||||||
|
func (cl closer) Close() error {
|
||||||
|
return cl()
|
||||||
|
}
|
@ -6,6 +6,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||||
|
"github.com/filecoin-project/lotus/lib/systar"
|
||||||
"github.com/filecoin-project/lotus/miner"
|
"github.com/filecoin-project/lotus/miner"
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage"
|
||||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
@ -63,24 +64,20 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
f, err := files.NewSerialFile(path, false, stat)
|
var rd io.Reader
|
||||||
|
if stat.IsDir() {
|
||||||
|
rd, err = systar.TarDirectory(path)
|
||||||
|
w.Header().Set("Content-Type", "application/x-tar")
|
||||||
|
} else {
|
||||||
|
rd, err = os.OpenFile(path, os.O_RDONLY, 0644)
|
||||||
|
w.Header().Set("Content-Type", "application/octet-stream")
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var rd io.Reader
|
|
||||||
rd, file := f.(files.File)
|
|
||||||
if !file {
|
|
||||||
mfr := files.NewMultiFileReader(f.(files.Directory), true)
|
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "multipart/form-data; boundary="+mfr.Boundary())
|
|
||||||
rd = mfr
|
|
||||||
} else {
|
|
||||||
w.Header().Set("Content-Type", "application/octet-stream")
|
|
||||||
}
|
|
||||||
|
|
||||||
w.WriteHeader(200)
|
w.WriteHeader(200)
|
||||||
if _, err := io.Copy(w, rd); err != nil {
|
if _, err := io.Copy(w, rd); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
@ -98,8 +95,6 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var file files.Node
|
|
||||||
|
|
||||||
mediatype, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
|
mediatype, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
@ -107,38 +102,24 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
switch mediatype {
|
|
||||||
case "multipart/form-data":
|
|
||||||
mpr, err := r.MultipartReader()
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
w.WriteHeader(500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
file, err = files.NewFileFromPartReader(mpr, mediatype)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
w.WriteHeader(500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
file = files.NewReaderFile(r.Body)
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteTo is unhappy when things exist (also cleans up cache after Commit)
|
|
||||||
if err := os.RemoveAll(path); err != nil {
|
if err := os.RemoveAll(path); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := files.WriteTo(file, path); err != nil {
|
switch mediatype {
|
||||||
|
case "application/x-tar":
|
||||||
|
if err := systar.ExtractTar(r.Body, path); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
if err := files.WriteTo(files.NewReaderFile(r.Body), path); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
w.WriteHeader(200)
|
w.WriteHeader(200)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user