lotus/cmd/lotus-seal-worker/main.go

318 lines
7.4 KiB
Go
Raw Normal View History

package main
2019-11-21 00:52:59 +00:00
import (
2020-03-13 01:37:38 +00:00
"context"
2020-03-16 18:46:02 +00:00
"encoding/json"
"io/ioutil"
2020-03-16 17:50:07 +00:00
"net"
2020-03-13 01:37:38 +00:00
"net/http"
2019-11-21 00:52:59 +00:00
"os"
2020-03-16 18:46:02 +00:00
"path/filepath"
2020-03-16 18:46:02 +00:00
"github.com/google/uuid"
2020-03-13 01:37:38 +00:00
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
2020-03-11 01:57:52 +00:00
"golang.org/x/xerrors"
2019-11-21 00:52:59 +00:00
"gopkg.in/urfave/cli.v2"
2020-05-20 17:43:22 +00:00
"github.com/filecoin-project/go-jsonrpc"
2020-05-20 18:23:51 +00:00
"github.com/filecoin-project/go-jsonrpc/auth"
2020-03-11 01:57:52 +00:00
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/sector-storage/ffiwrapper"
2020-03-23 11:40:02 +00:00
2020-03-11 01:57:52 +00:00
"github.com/filecoin-project/lotus/api"
2020-03-13 01:37:38 +00:00
"github.com/filecoin-project/lotus/api/apistruct"
2019-11-21 00:52:59 +00:00
"github.com/filecoin-project/lotus/build"
2020-03-11 01:57:52 +00:00
lcli "github.com/filecoin-project/lotus/cli"
2020-01-08 13:49:34 +00:00
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/node/repo"
2020-03-27 23:00:21 +00:00
"github.com/filecoin-project/sector-storage"
"github.com/filecoin-project/sector-storage/sealtasks"
"github.com/filecoin-project/sector-storage/stores"
2019-11-21 00:52:59 +00:00
)
var log = logging.Logger("main")
2020-03-13 01:37:38 +00:00
const FlagStorageRepo = "workerrepo"
2019-11-21 00:52:59 +00:00
func main() {
2020-01-08 13:49:34 +00:00
lotuslog.SetupLogLevels()
2019-11-21 00:52:59 +00:00
log.Info("Starting lotus worker")
local := []*cli.Command{
runCmd,
}
app := &cli.App{
2019-11-22 16:25:56 +00:00
Name: "lotus-seal-worker",
2019-11-21 00:52:59 +00:00
Usage: "Remote storage miner worker",
Version: build.UserVersion,
2019-11-21 00:52:59 +00:00
Flags: []cli.Flag{
&cli.StringFlag{
2020-03-22 21:39:06 +00:00
Name: FlagStorageRepo,
2019-11-21 00:52:59 +00:00
EnvVars: []string{"WORKER_PATH"},
Value: "~/.lotusworker", // TODO: Consider XDG_DATA_HOME
},
&cli.StringFlag{
2019-11-21 16:10:04 +00:00
Name: "storagerepo",
EnvVars: []string{"LOTUS_STORAGE_PATH"},
2019-11-21 00:52:59 +00:00
Value: "~/.lotusstorage", // TODO: Consider XDG_DATA_HOME
},
2019-12-07 14:19:46 +00:00
&cli.BoolFlag{
Name: "enable-gpu-proving",
Usage: "enable use of GPU for mining operations",
Value: true,
},
2019-11-21 00:52:59 +00:00
},
Commands: local,
}
app.Setup()
2020-03-25 21:15:10 +00:00
app.Metadata["repoType"] = repo.Worker
2019-11-21 00:52:59 +00:00
if err := app.Run(os.Args); err != nil {
2019-11-21 18:38:43 +00:00
log.Warnf("%+v", err)
2019-11-21 00:52:59 +00:00
return
}
}
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus worker",
2020-03-16 17:50:07 +00:00
Flags: []cli.Flag{
&cli.StringFlag{
Name: "address",
Usage: "Locally reachable address",
},
2020-03-16 18:46:02 +00:00
&cli.BoolFlag{
Name: "no-local-storage",
Usage: "don't use storageminer repo for sector storage",
},
2020-03-25 21:15:10 +00:00
&cli.BoolFlag{
Name: "precommit1",
Usage: "enable precommit1 (32G sectors: 1 core, 128GiB Memory)",
Value: true,
},
&cli.BoolFlag{
Name: "precommit2",
Usage: "enable precommit2 (32G sectors: all cores, 96GiB Memory)",
Value: true,
},
&cli.BoolFlag{
Name: "commit",
Usage: "enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap)",
Value: true,
},
2020-03-16 17:50:07 +00:00
},
2019-11-21 00:52:59 +00:00
Action: func(cctx *cli.Context) error {
2020-03-11 01:57:52 +00:00
if !cctx.Bool("enable-gpu-proving") {
os.Setenv("BELLMAN_NO_GPU", "true")
}
2020-03-16 17:50:07 +00:00
if cctx.String("address") == "" {
return xerrors.Errorf("--address flag is required")
}
// Connect to storage-miner
2020-03-11 01:57:52 +00:00
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return xerrors.Errorf("getting miner api: %w", err)
}
defer closer()
ctx := lcli.ReqContext(cctx)
2020-03-18 23:23:28 +00:00
ctx, cancel := context.WithCancel(ctx)
defer cancel()
2020-03-11 01:57:52 +00:00
v, err := nodeApi.Version(ctx)
if err != nil {
return err
}
if v.APIVersion != build.APIVersion {
return xerrors.Errorf("lotus-storage-miner API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion})
}
2020-03-13 01:37:38 +00:00
log.Infof("Remote version %s", v)
2020-03-11 01:57:52 +00:00
2020-03-16 17:50:07 +00:00
// Check params
2020-03-11 01:57:52 +00:00
act, err := nodeApi.ActorAddress(ctx)
if err != nil {
return err
}
ssize, err := nodeApi.ActorSectorSize(ctx, act)
if err != nil {
return err
}
2020-03-25 21:15:10 +00:00
if cctx.Bool("commit") {
if err := paramfetch.GetParams(build.ParametersJson(), uint64(ssize)); err != nil {
return xerrors.Errorf("get params: %w", err)
}
}
var taskTypes []sealtasks.TaskType
taskTypes = append(taskTypes, sealtasks.TTFetch)
2020-03-25 21:15:10 +00:00
if cctx.Bool("precommit1") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit1)
}
if cctx.Bool("precommit2") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit2)
}
if cctx.Bool("commit") {
taskTypes = append(taskTypes, sealtasks.TTCommit2)
}
if len(taskTypes) == 0 {
return xerrors.Errorf("no task types specified")
2020-03-11 01:57:52 +00:00
}
2020-03-16 17:50:07 +00:00
// Open repo
2020-03-13 01:37:38 +00:00
repoPath := cctx.String(FlagStorageRepo)
r, err := repo.NewFS(repoPath)
if err != nil {
return err
}
ok, err := r.Exists()
if err != nil {
return err
}
if !ok {
2020-03-16 18:46:02 +00:00
if err := r.Init(repo.Worker); err != nil {
return err
}
lr, err := r.Lock(repo.Worker)
if err != nil {
return err
}
var localPaths []stores.LocalPath
2020-03-16 18:46:02 +00:00
if !cctx.Bool("no-local-storage") {
2020-03-19 15:10:19 +00:00
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
2020-03-16 18:46:02 +00:00
ID: stores.ID(uuid.New().String()),
Weight: 10,
CanSeal: true,
CanStore: false,
}, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
}
if err := ioutil.WriteFile(filepath.Join(lr.Path(), "sectorstore.json"), b, 0644); err != nil {
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
}
localPaths = append(localPaths, stores.LocalPath{
2020-03-16 18:46:02 +00:00
Path: lr.Path(),
})
}
if err := lr.SetStorage(func(sc *stores.StorageConfig) {
2020-03-16 18:46:02 +00:00
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
}); err != nil {
return xerrors.Errorf("set storage config: %w", err)
}
2020-03-18 23:23:28 +00:00
{
// init datastore for r.Exists
_, err := lr.Datastore("/")
if err != nil {
return err
}
}
2020-03-16 18:46:02 +00:00
if err := lr.Close(); err != nil {
return xerrors.Errorf("close repo: %w", err)
}
2020-03-13 01:37:38 +00:00
}
lr, err := r.Lock(repo.Worker)
if err != nil {
return err
}
2020-03-19 15:10:19 +00:00
log.Info("Opening local storage; connecting to master")
localStore, err := stores.NewLocal(ctx, lr, nodeApi, []string{"http://" + cctx.String("address") + "/remote"})
2020-03-13 01:37:38 +00:00
if err != nil {
return err
}
2020-03-16 17:50:07 +00:00
// Setup remote sector store
2020-04-10 21:29:05 +00:00
spt, err := ffiwrapper.SealProofTypeFromSectorSize(ssize)
2020-03-13 01:37:38 +00:00
if err != nil {
return xerrors.Errorf("getting proof type: %w", err)
}
sminfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get api info: %w", err)
}
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader())
2020-03-16 17:50:07 +00:00
// Create / expose the worker
2020-03-13 01:37:38 +00:00
workerApi := &worker{
2020-03-23 11:40:02 +00:00
LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
SealProof: spt,
2020-03-25 21:15:10 +00:00
TaskTypes: taskTypes,
}, remote, localStore, nodeApi),
2020-03-13 01:37:38 +00:00
}
mux := mux.NewRouter()
2020-03-19 15:10:19 +00:00
log.Info("Setting up control endpoint at " + cctx.String("address"))
2020-03-18 04:40:25 +00:00
2020-03-13 01:37:38 +00:00
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi))
mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Local: localStore}).ServeHTTP)
2020-03-13 01:37:38 +00:00
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
ah := &auth.Handler{
Verify: nodeApi.AuthVerify,
Next: mux.ServeHTTP,
}
2020-03-18 23:23:28 +00:00
srv := &http.Server{
Handler: ah,
BaseContext: func(listener net.Listener) context.Context {
return ctx
},
}
2020-03-13 01:37:38 +00:00
go func() {
2020-03-18 04:40:25 +00:00
<-ctx.Done()
2020-03-13 01:37:38 +00:00
log.Warn("Shutting down..")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
log.Warn("Graceful shutdown successful")
}()
2020-03-16 17:50:07 +00:00
2020-03-22 21:39:06 +00:00
nl, err := net.Listen("tcp", cctx.String("address"))
2020-03-16 17:50:07 +00:00
if err != nil {
return err
}
2020-03-18 04:40:25 +00:00
log.Info("Waiting for tasks")
2020-03-18 23:23:28 +00:00
go func() {
if err := nodeApi.WorkerConnect(ctx, "ws://"+cctx.String("address")+"/rpc/v0"); err != nil {
log.Errorf("Registering worker failed: %+v", err)
cancel()
return
}
}()
2020-03-16 17:50:07 +00:00
return srv.Serve(nl)
2019-11-21 00:52:59 +00:00
},
}