2019-11-21 14:10:51 +00:00
|
|
|
package main
|
2019-11-21 00:52:59 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2019-11-30 13:22:50 +00:00
|
|
|
files "github.com/ipfs/go-ipfs-files"
|
2019-11-21 18:38:43 +00:00
|
|
|
"gopkg.in/cheggaaa/pb.v1"
|
2019-11-21 00:52:59 +00:00
|
|
|
"io"
|
2019-11-30 13:22:50 +00:00
|
|
|
"mime"
|
|
|
|
"mime/multipart"
|
2019-11-21 00:52:59 +00:00
|
|
|
"net/http"
|
|
|
|
"os"
|
|
|
|
"path/filepath"
|
|
|
|
|
2019-11-21 14:10:51 +00:00
|
|
|
"golang.org/x/xerrors"
|
|
|
|
|
2019-11-21 00:52:59 +00:00
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
|
|
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
|
|
|
)
|
|
|
|
|
|
|
|
type worker struct {
|
|
|
|
api api.StorageMiner
|
|
|
|
minerEndpoint string
|
|
|
|
repo string
|
2019-11-21 18:38:43 +00:00
|
|
|
auth http.Header
|
2019-11-21 00:52:59 +00:00
|
|
|
|
|
|
|
sb *sectorbuilder.SectorBuilder
|
|
|
|
}
|
|
|
|
|
2019-11-21 18:38:43 +00:00
|
|
|
func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth http.Header, repo string) error {
|
2019-11-21 14:10:51 +00:00
|
|
|
act, err := api.ActorAddress(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
ssize, err := api.ActorSectorSize(ctx, act)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
sb, err := sectorbuilder.NewStandalone(§orbuilder.Config{
|
|
|
|
SectorSize: ssize,
|
|
|
|
Miner: act,
|
|
|
|
WorkerThreads: 1,
|
|
|
|
CacheDir: filepath.Join(repo, "cache"),
|
|
|
|
SealedDir: filepath.Join(repo, "sealed"),
|
|
|
|
StagedDir: filepath.Join(repo, "staged"),
|
2019-12-03 02:23:49 +00:00
|
|
|
UnsealedDir: filepath.Join(repo, "unsealed"),
|
2019-11-21 14:10:51 +00:00
|
|
|
})
|
2019-11-21 18:38:43 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-11-21 14:10:51 +00:00
|
|
|
|
2019-11-21 00:52:59 +00:00
|
|
|
w := &worker{
|
2019-11-21 14:10:51 +00:00
|
|
|
api: api,
|
|
|
|
minerEndpoint: endpoint,
|
2019-11-21 18:38:43 +00:00
|
|
|
auth: auth,
|
2019-11-21 14:10:51 +00:00
|
|
|
repo: repo,
|
|
|
|
sb: sb,
|
2019-11-21 00:52:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
tasks, err := api.WorkerQueue(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
for task := range tasks {
|
2019-11-21 18:38:43 +00:00
|
|
|
log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type)
|
|
|
|
|
2019-11-21 00:52:59 +00:00
|
|
|
res := w.processTask(ctx, task)
|
|
|
|
|
2019-11-30 13:22:50 +00:00
|
|
|
log.Infof("Task %d done, err: %+v", task.TaskID, res.GoErr)
|
2019-11-21 18:38:43 +00:00
|
|
|
|
2019-11-21 14:10:51 +00:00
|
|
|
if err := api.WorkerDone(ctx, task.TaskID, res); err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
}
|
2019-11-21 00:52:59 +00:00
|
|
|
}
|
2019-11-21 14:10:51 +00:00
|
|
|
|
|
|
|
log.Warn("acceptJobs exit")
|
|
|
|
return nil
|
2019-11-21 00:52:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) sectorbuilder.SealRes {
|
|
|
|
switch task.Type {
|
|
|
|
case sectorbuilder.WorkerPreCommit:
|
|
|
|
case sectorbuilder.WorkerCommit:
|
|
|
|
default:
|
|
|
|
return errRes(xerrors.Errorf("unknown task type %d", task.Type))
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := w.fetchSector(task.SectorID, task.Type); err != nil {
|
2019-11-22 15:48:02 +00:00
|
|
|
return errRes(xerrors.Errorf("fetching sector: %w", err))
|
2019-11-21 00:52:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var res sectorbuilder.SealRes
|
|
|
|
|
|
|
|
switch task.Type {
|
|
|
|
case sectorbuilder.WorkerPreCommit:
|
2019-11-21 16:10:04 +00:00
|
|
|
rspco, err := w.sb.SealPreCommit(task.SectorID, task.SealTicket, task.Pieces)
|
2019-11-21 00:52:59 +00:00
|
|
|
if err != nil {
|
2019-11-22 15:48:02 +00:00
|
|
|
return errRes(xerrors.Errorf("precomitting: %w", err))
|
2019-11-21 00:52:59 +00:00
|
|
|
}
|
2019-11-22 15:48:02 +00:00
|
|
|
res.Rspco = rspco.ToJson()
|
2019-11-21 00:52:59 +00:00
|
|
|
|
|
|
|
if err := w.push("sealed", task.SectorID); err != nil {
|
2019-11-22 15:48:02 +00:00
|
|
|
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
|
2019-11-21 00:52:59 +00:00
|
|
|
}
|
2019-11-30 13:22:50 +00:00
|
|
|
|
|
|
|
if err := w.push("cache", task.SectorID); err != nil {
|
|
|
|
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
|
|
|
|
}
|
2019-11-21 00:52:59 +00:00
|
|
|
case sectorbuilder.WorkerCommit:
|
2019-11-30 09:25:31 +00:00
|
|
|
proof, err := w.sb.SealCommit(task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, task.Rspco)
|
2019-11-21 19:51:48 +00:00
|
|
|
if err != nil {
|
2019-11-22 15:48:02 +00:00
|
|
|
return errRes(xerrors.Errorf("comitting: %w", err))
|
2019-11-21 19:51:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
res.Proof = proof
|
|
|
|
|
2019-11-30 13:22:50 +00:00
|
|
|
if err := w.push("cache", task.SectorID); err != nil {
|
|
|
|
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
|
|
|
|
}
|
2019-11-21 00:52:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return res
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *worker) fetch(typ string, sectorID uint64) error {
|
|
|
|
outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
|
|
|
|
|
2019-11-21 18:38:43 +00:00
|
|
|
url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)
|
2019-11-30 13:22:50 +00:00
|
|
|
log.Infof("Fetch %s %s", typ, url)
|
2019-11-21 18:38:43 +00:00
|
|
|
|
|
|
|
req, err := http.NewRequest("GET", url, nil)
|
|
|
|
if err != nil {
|
2019-11-30 13:22:50 +00:00
|
|
|
return xerrors.Errorf("request: %w", err)
|
2019-11-21 18:38:43 +00:00
|
|
|
}
|
|
|
|
req.Header = w.auth
|
|
|
|
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
2019-11-21 00:52:59 +00:00
|
|
|
if err != nil {
|
2019-11-30 13:22:50 +00:00
|
|
|
return xerrors.Errorf("do request: %w", err)
|
2019-11-21 00:52:59 +00:00
|
|
|
}
|
2019-11-21 18:38:43 +00:00
|
|
|
|
2019-11-21 00:52:59 +00:00
|
|
|
defer resp.Body.Close()
|
|
|
|
|
2019-11-21 18:38:43 +00:00
|
|
|
bar := pb.New64(resp.ContentLength)
|
|
|
|
bar.ShowPercent = true
|
|
|
|
bar.ShowSpeed = true
|
|
|
|
bar.Units = pb.U_BYTES
|
|
|
|
|
2019-11-30 13:22:50 +00:00
|
|
|
barreader := bar.NewProxyReader(resp.Body)
|
|
|
|
|
2019-11-21 18:38:43 +00:00
|
|
|
bar.Start()
|
|
|
|
defer bar.Finish()
|
2019-11-21 00:52:59 +00:00
|
|
|
|
2019-11-30 13:22:50 +00:00
|
|
|
mediatype, p, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("parse media type: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
var file files.Node
|
|
|
|
switch mediatype {
|
|
|
|
case "multipart/form-data":
|
|
|
|
mpr := multipart.NewReader(barreader, p["boundary"])
|
|
|
|
|
|
|
|
file, err = files.NewFileFromPartReader(mpr, mediatype)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("call to NewFileFromPartReader failed: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
case "application/octet-stream":
|
|
|
|
file = files.NewReaderFile(barreader)
|
|
|
|
default:
|
|
|
|
return xerrors.Errorf("unknown content type: '%s'", mediatype)
|
|
|
|
}
|
|
|
|
|
|
|
|
// WriteTo is unhappy when things exist
|
|
|
|
if err := os.RemoveAll(outname); err != nil {
|
|
|
|
return xerrors.Errorf("removing dest: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return files.WriteTo(file, outname)
|
2019-11-21 00:52:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (w *worker) push(typ string, sectorID uint64) error {
|
|
|
|
outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
|
|
|
|
|
2019-11-30 13:22:50 +00:00
|
|
|
stat, err := os.Stat(outname)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
f, err := files.NewSerialFile(outname, false, stat)
|
2019-11-21 00:52:59 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-11-21 18:38:43 +00:00
|
|
|
url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)
|
2019-11-30 13:22:50 +00:00
|
|
|
log.Infof("Push %s %s", typ, url)
|
2019-11-21 18:38:43 +00:00
|
|
|
|
2019-11-30 13:22:50 +00:00
|
|
|
sz, err := f.Size()
|
2019-11-21 18:38:43 +00:00
|
|
|
if err != nil {
|
2019-11-30 13:22:50 +00:00
|
|
|
return xerrors.Errorf("getting size: %w", err)
|
2019-11-21 18:38:43 +00:00
|
|
|
}
|
|
|
|
|
2019-11-30 13:22:50 +00:00
|
|
|
bar := pb.New64(sz)
|
2019-11-21 18:38:43 +00:00
|
|
|
bar.ShowPercent = true
|
|
|
|
bar.ShowSpeed = true
|
|
|
|
bar.Units = pb.U_BYTES
|
|
|
|
|
|
|
|
bar.Start()
|
|
|
|
defer bar.Finish()
|
2019-11-21 19:51:48 +00:00
|
|
|
//todo set content size
|
2019-11-30 13:22:50 +00:00
|
|
|
|
|
|
|
header := w.auth
|
|
|
|
|
|
|
|
var r io.Reader
|
|
|
|
r, file := f.(files.File)
|
|
|
|
if !file {
|
|
|
|
mfr := files.NewMultiFileReader(f.(files.Directory), true)
|
|
|
|
|
|
|
|
header.Set("Content-Type", "multipart/form-data; boundary="+mfr.Boundary())
|
|
|
|
r = mfr
|
|
|
|
} else {
|
|
|
|
header.Set("Content-Type", "application/octet-stream")
|
|
|
|
}
|
|
|
|
|
|
|
|
req, err := http.NewRequest("PUT", url, bar.NewProxyReader(r))
|
2019-11-21 00:52:59 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-11-30 13:22:50 +00:00
|
|
|
req.Header = header
|
2019-11-21 18:38:43 +00:00
|
|
|
|
2019-11-21 00:52:59 +00:00
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-11-21 18:38:43 +00:00
|
|
|
if resp.StatusCode != 200 {
|
|
|
|
return xerrors.Errorf("non-200 response: %d", resp.StatusCode)
|
|
|
|
}
|
2019-11-21 00:52:59 +00:00
|
|
|
|
|
|
|
return resp.Body.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error {
|
|
|
|
var err error
|
|
|
|
switch typ {
|
|
|
|
case sectorbuilder.WorkerPreCommit:
|
|
|
|
err = w.fetch("staged", sectorID)
|
|
|
|
case sectorbuilder.WorkerCommit:
|
2019-11-21 19:51:48 +00:00
|
|
|
err = w.fetch("sealed", sectorID)
|
2019-11-30 13:22:50 +00:00
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("fetch sealed: %w", err)
|
|
|
|
}
|
|
|
|
err = w.fetch("cache", sectorID)
|
2019-11-21 00:52:59 +00:00
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("fetch failed: %w", err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func errRes(err error) sectorbuilder.SealRes {
|
2019-11-30 13:22:50 +00:00
|
|
|
return sectorbuilder.SealRes{Err: err.Error(), GoErr: err}
|
2019-11-21 00:52:59 +00:00
|
|
|
}
|