lotus/cmd/lotus-seal-worker/sub.go

146 lines
3.5 KiB
Go
Raw Normal View History

package main
2019-11-21 00:52:59 +00:00
import (
"context"
"net/http"
2020-01-02 19:08:49 +00:00
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-sectorbuilder"
2020-01-02 19:08:49 +00:00
"golang.org/x/xerrors"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
2019-11-21 00:52:59 +00:00
)
type worker struct {
api lapi.StorageMiner
2019-11-21 00:52:59 +00:00
minerEndpoint string
repo string
2019-11-21 18:38:43 +00:00
auth http.Header
2019-11-21 00:52:59 +00:00
sb *sectorbuilder.SectorBuilder
}
func acceptJobs(ctx context.Context, api lapi.StorageMiner, endpoint string, auth http.Header, repo string, noprecommit, nocommit bool) error {
act, err := api.ActorAddress(ctx)
if err != nil {
return err
}
ssize, err := api.ActorSectorSize(ctx, act)
if err != nil {
return err
}
sb, err := sectorbuilder.NewStandalone(&sectorbuilder.Config{
SectorSize: ssize,
Miner: act,
WorkerThreads: 1,
Dir: repo,
})
2019-11-21 18:38:43 +00:00
if err != nil {
return err
}
if err := paramfetch.GetParams(build.ParametersJson, ssize); err != nil {
return xerrors.Errorf("get params: %w", err)
}
2019-11-21 00:52:59 +00:00
w := &worker{
api: api,
minerEndpoint: endpoint,
2019-11-21 18:38:43 +00:00
auth: auth,
repo: repo,
sb: sb,
2019-11-21 00:52:59 +00:00
}
tasks, err := api.WorkerQueue(ctx, sectorbuilder.WorkerCfg{
NoPreCommit: noprecommit,
NoCommit: nocommit,
})
2019-11-21 00:52:59 +00:00
if err != nil {
return err
}
2019-12-04 16:53:32 +00:00
loop:
for {
log.Infof("Waiting for new task")
2019-12-04 16:53:32 +00:00
select {
case task := <-tasks:
log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type)
2019-11-21 18:38:43 +00:00
2019-12-04 16:53:32 +00:00
res := w.processTask(ctx, task)
2019-11-21 00:52:59 +00:00
2019-12-04 16:53:32 +00:00
log.Infof("Task %d done, err: %+v", task.TaskID, res.GoErr)
2019-11-21 18:38:43 +00:00
2019-12-04 16:53:32 +00:00
if err := api.WorkerDone(ctx, task.TaskID, res); err != nil {
log.Error(err)
}
case <-ctx.Done():
break loop
}
2019-11-21 00:52:59 +00:00
}
log.Warn("acceptJobs exit")
return nil
2019-11-21 00:52:59 +00:00
}
func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) sectorbuilder.SealRes {
switch task.Type {
case sectorbuilder.WorkerPreCommit:
case sectorbuilder.WorkerCommit:
default:
return errRes(xerrors.Errorf("unknown task type %d", task.Type))
}
if err := w.fetchSector(task.SectorID, task.Type); err != nil {
2019-11-22 15:48:02 +00:00
return errRes(xerrors.Errorf("fetching sector: %w", err))
2019-11-21 00:52:59 +00:00
}
log.Infof("Data fetched, starting computation")
2019-11-21 00:52:59 +00:00
var res sectorbuilder.SealRes
switch task.Type {
case sectorbuilder.WorkerPreCommit:
rspco, err := w.sb.SealPreCommit(ctx, task.SectorID, task.SealTicket, task.Pieces)
2019-11-21 00:52:59 +00:00
if err != nil {
2019-11-22 15:48:02 +00:00
return errRes(xerrors.Errorf("precomitting: %w", err))
2019-11-21 00:52:59 +00:00
}
2019-11-22 15:48:02 +00:00
res.Rspco = rspco.ToJson()
2019-11-21 00:52:59 +00:00
if err := w.push("sealed", task.SectorID); err != nil {
2019-11-22 15:48:02 +00:00
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
2019-11-21 00:52:59 +00:00
}
2019-11-30 13:22:50 +00:00
if err := w.push("cache", task.SectorID); err != nil {
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
}
if err := w.remove("staging", task.SectorID); err != nil {
return errRes(xerrors.Errorf("cleaning up staged sector: %w", err))
}
2019-11-21 00:52:59 +00:00
case sectorbuilder.WorkerCommit:
proof, err := w.sb.SealCommit(ctx, task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, task.Rspco)
2019-11-21 19:51:48 +00:00
if err != nil {
2019-11-22 15:48:02 +00:00
return errRes(xerrors.Errorf("comitting: %w", err))
2019-11-21 19:51:48 +00:00
}
res.Proof = proof
2019-11-30 13:22:50 +00:00
if err := w.push("cache", task.SectorID); err != nil {
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
}
if err := w.remove("sealed", task.SectorID); err != nil {
return errRes(xerrors.Errorf("cleaning up sealed sector: %w", err))
}
2019-11-21 00:52:59 +00:00
}
return res
}
func errRes(err error) sectorbuilder.SealRes {
2019-11-30 13:22:50 +00:00
return sectorbuilder.SealRes{Err: err.Error(), GoErr: err}
2019-11-21 00:52:59 +00:00
}