lotus/cmd/lotus-seal-worker/transfer.go

176 lines
3.8 KiB
Go
Raw Normal View History

package main
import (
2020-01-31 18:56:48 +00:00
"fmt"
"io"
"mime"
"net/http"
"os"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
2020-01-31 18:56:48 +00:00
"github.com/filecoin-project/go-sectorbuilder/fs"
files "github.com/ipfs/go-ipfs-files"
"golang.org/x/xerrors"
"gopkg.in/cheggaaa/pb.v1"
"path/filepath"
"github.com/filecoin-project/lotus/lib/tarutil"
)
2020-01-23 14:38:36 +00:00
func (w *worker) sizeForType(typ string) int64 {
size := int64(w.sb.SectorSize())
if typ == "cache" {
size *= 10
}
return size
}
func (w *worker) fetch(typ string, sectorID uint64) error {
outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
2020-01-31 18:56:48 +00:00
url := w.minerEndpoint + "/remote/" + typ + "/" + fmt.Sprint(sectorID)
log.Infof("Fetch %s %s", typ, url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return xerrors.Errorf("request: %w", err)
}
req.Header = w.auth
resp, err := http.DefaultClient.Do(req)
if err != nil {
return xerrors.Errorf("do request: %w", err)
}
defer resp.Body.Close()
2019-12-04 16:53:32 +00:00
if resp.StatusCode != 200 {
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
}
2020-01-23 14:38:36 +00:00
bar := pb.New64(w.sizeForType(typ))
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES
barreader := bar.NewProxyReader(resp.Body)
bar.Start()
defer bar.Finish()
mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
return xerrors.Errorf("parse media type: %w", err)
}
if err := os.RemoveAll(outname); err != nil {
return xerrors.Errorf("removing dest: %w", err)
}
switch mediatype {
case "application/x-tar":
return tarutil.ExtractTar(barreader, outname)
case "application/octet-stream":
return files.WriteTo(files.NewReaderFile(barreader), outname)
default:
return xerrors.Errorf("unknown content type: '%s'", mediatype)
}
}
func (w *worker) push(typ string, sectorID uint64) error {
2020-02-04 19:04:49 +00:00
w.limiter.transferLimit <- struct{}{}
defer func() {
<-w.limiter.transferLimit
}()
2020-01-31 18:56:48 +00:00
filename, err := w.sb.SectorPath(fs.DataType(typ), sectorID)
if err != nil {
return err
}
2020-01-31 18:56:48 +00:00
url := w.minerEndpoint + "/remote/" + typ + "/" + fmt.Sprint(sectorID)
log.Infof("Push %s %s", typ, url)
2020-01-31 18:56:48 +00:00
stat, err := os.Stat(string(filename))
if err != nil {
return err
}
var r io.Reader
if stat.IsDir() {
2020-01-31 18:56:48 +00:00
r, err = tarutil.TarDirectory(string(filename))
} else {
2020-01-31 18:56:48 +00:00
r, err = os.OpenFile(string(filename), os.O_RDONLY, 0644)
}
if err != nil {
return xerrors.Errorf("opening push reader: %w", err)
}
2020-01-23 14:38:36 +00:00
bar := pb.New64(w.sizeForType(typ))
bar.ShowPercent = true
bar.ShowSpeed = true
bar.ShowCounters = true
bar.Units = pb.U_BYTES
bar.Start()
defer bar.Finish()
//todo set content size
header := w.auth
if stat.IsDir() {
header.Set("Content-Type", "application/x-tar")
} else {
header.Set("Content-Type", "application/octet-stream")
}
req, err := http.NewRequest("PUT", url, bar.NewProxyReader(r))
if err != nil {
return err
}
req.Header = header
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return xerrors.Errorf("non-200 response: %d", resp.StatusCode)
}
2019-12-04 16:53:32 +00:00
if err := resp.Body.Close(); err != nil {
return err
}
// TODO: keep files around for later stages of sealing
return w.remove(typ, sectorID)
}
func (w *worker) remove(typ string, sectorID uint64) error {
filename := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
2019-12-04 16:53:32 +00:00
return os.RemoveAll(filename)
}
func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error {
2020-02-04 19:04:49 +00:00
w.limiter.transferLimit <- struct{}{}
defer func() {
<-w.limiter.transferLimit
}()
var err error
switch typ {
case sectorbuilder.WorkerPreCommit:
2019-12-16 18:49:32 +00:00
err = w.fetch("staging", sectorID)
case sectorbuilder.WorkerCommit:
err = w.fetch("sealed", sectorID)
if err != nil {
return xerrors.Errorf("fetch sealed: %w", err)
}
err = w.fetch("cache", sectorID)
}
if err != nil {
return xerrors.Errorf("fetch failed: %w", err)
}
return nil
}