lotus/cmd/lotus-seal-worker/main.go
Travis Person ca53bf9097
lotus-worker: change address flag to listen (#2848)
* lotus-worker: change address flag to listen

Clarify the flag on the worker process to be the local address and port
the worker will listen on, and not the address of the miner.

* fixup! lotus-worker: change address flag to listen

Co-authored-by: Travis Person <travisperson@users.noreply.github.com>
2020-08-06 02:54:00 +02:00

445 lines
11 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/node/repo"
sectorstorage "github.com/filecoin-project/sector-storage"
"github.com/filecoin-project/sector-storage/sealtasks"
"github.com/filecoin-project/sector-storage/stores"
)
var log = logging.Logger("main")
const FlagWorkerRepo = "worker-repo"
// TODO remove after deprecation period
const FlagWorkerRepoDeprecation = "workerrepo"
func main() {
lotuslog.SetupLogLevels()
log.Info("Starting lotus worker")
local := []*cli.Command{
runCmd,
}
app := &cli.App{
Name: "lotus-worker",
Usage: "Remote miner worker",
Version: build.UserVersion(),
Flags: []cli.Flag{
&cli.StringFlag{
Name: FlagWorkerRepo,
Aliases: []string{FlagWorkerRepoDeprecation},
EnvVars: []string{"LOTUS_WORKER_PATH", "WORKER_PATH"},
Value: "~/.lotusworker", // TODO: Consider XDG_DATA_HOME
Usage: fmt.Sprintf("Specify worker repo path. flag %s and env WORKER_PATH are DEPRECATION, will REMOVE SOON", FlagWorkerRepoDeprecation),
},
&cli.StringFlag{
Name: "miner-repo",
Aliases: []string{"storagerepo"},
EnvVars: []string{"LOTUS_MINER_PATH", "LOTUS_STORAGE_PATH"},
Value: "~/.lotusminer", // TODO: Consider XDG_DATA_HOME
Usage: fmt.Sprintf("Specify miner repo path. flag storagerepo and env LOTUS_STORAGE_PATH are DEPRECATION, will REMOVE SOON"),
},
&cli.BoolFlag{
Name: "enable-gpu-proving",
Usage: "enable use of GPU for mining operations",
Value: true,
},
},
Commands: local,
}
app.Setup()
app.Metadata["repoType"] = repo.Worker
if err := app.Run(os.Args); err != nil {
log.Warnf("%+v", err)
return
}
}
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus worker",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "listen",
Usage: "host address and port the worker api will listen on",
Value: "0.0.0.0:3456",
},
&cli.StringFlag{
Name: "address",
Hidden: true,
},
&cli.BoolFlag{
Name: "no-local-storage",
Usage: "don't use storageminer repo for sector storage",
},
&cli.BoolFlag{
Name: "precommit1",
Usage: "enable precommit1 (32G sectors: 1 core, 128GiB Memory)",
Value: true,
},
&cli.BoolFlag{
Name: "precommit2",
Usage: "enable precommit2 (32G sectors: all cores, 96GiB Memory)",
Value: true,
},
&cli.BoolFlag{
Name: "commit",
Usage: "enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap)",
Value: true,
},
&cli.IntFlag{
Name: "parallel-fetch-limit",
Usage: "maximum fetch operations to run in parallel",
Value: 5,
},
&cli.StringFlag{
Name: "timeout",
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
Value: "30m",
},
},
Before: func(cctx *cli.Context) error {
if cctx.IsSet("address") {
log.Warnf("The '--address' flag is deprecated, it has been replaced by '--listen'")
if err := cctx.Set("listen", cctx.String("address")); err != nil {
return err
}
}
return nil
},
Action: func(cctx *cli.Context) error {
if !cctx.Bool("enable-gpu-proving") {
if err := os.Setenv("BELLMAN_NO_GPU", "true"); err != nil {
return xerrors.Errorf("could not set no-gpu env: %+v", err)
}
}
// Connect to storage-miner
var nodeApi api.StorageMiner
var closer func()
var err error
for {
nodeApi, closer, err = lcli.GetStorageMinerAPI(cctx)
if err == nil {
break
}
fmt.Printf("\r\x1b[0KConnecting to miner API... (%s)", err)
time.Sleep(time.Second)
continue
}
defer closer()
ctx := lcli.ReqContext(cctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
v, err := nodeApi.Version(ctx)
if err != nil {
return err
}
if v.APIVersion != build.APIVersion {
return xerrors.Errorf("lotus-miner API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion})
}
log.Infof("Remote version %s", v)
watchMinerConn(ctx, cctx, nodeApi)
// Check params
act, err := nodeApi.ActorAddress(ctx)
if err != nil {
return err
}
ssize, err := nodeApi.ActorSectorSize(ctx, act)
if err != nil {
return err
}
if cctx.Bool("commit") {
if err := paramfetch.GetParams(ctx, build.ParametersJSON(), uint64(ssize)); err != nil {
return xerrors.Errorf("get params: %w", err)
}
}
var taskTypes []sealtasks.TaskType
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize)
if cctx.Bool("precommit1") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit1)
}
if cctx.Bool("precommit2") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit2)
}
if cctx.Bool("commit") {
taskTypes = append(taskTypes, sealtasks.TTCommit2)
}
if len(taskTypes) == 0 {
return xerrors.Errorf("no task types specified")
}
// Open repo
repoPath := cctx.String(FlagWorkerRepo)
r, err := repo.NewFS(repoPath)
if err != nil {
return err
}
ok, err := r.Exists()
if err != nil {
return err
}
if !ok {
if err := r.Init(repo.Worker); err != nil {
return err
}
lr, err := r.Lock(repo.Worker)
if err != nil {
return err
}
var localPaths []stores.LocalPath
if !cctx.Bool("no-local-storage") {
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: 10,
CanSeal: true,
CanStore: false,
}, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
}
if err := ioutil.WriteFile(filepath.Join(lr.Path(), "sectorstore.json"), b, 0644); err != nil {
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
}
localPaths = append(localPaths, stores.LocalPath{
Path: lr.Path(),
})
}
if err := lr.SetStorage(func(sc *stores.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
}); err != nil {
return xerrors.Errorf("set storage config: %w", err)
}
{
// init datastore for r.Exists
_, err := lr.Datastore("/metadata")
if err != nil {
return err
}
}
if err := lr.Close(); err != nil {
return xerrors.Errorf("close repo: %w", err)
}
}
lr, err := r.Lock(repo.Worker)
if err != nil {
return err
}
log.Info("Opening local storage; connecting to master")
const unspecifiedAddress = "0.0.0.0"
address := cctx.String("listen")
addressSlice := strings.Split(address, ":")
if ip := net.ParseIP(addressSlice[0]); ip != nil {
if ip.String() == unspecifiedAddress {
timeout, err := time.ParseDuration(cctx.String("timeout"))
if err != nil {
return err
}
rip, err := extractRoutableIP(timeout)
if err != nil {
return err
}
address = rip + ":" + addressSlice[1]
}
}
localStore, err := stores.NewLocal(ctx, lr, nodeApi, []string{"http://" + address + "/remote"})
if err != nil {
return err
}
// Setup remote sector store
spt, err := ffiwrapper.SealProofTypeFromSectorSize(ssize)
if err != nil {
return xerrors.Errorf("getting proof type: %w", err)
}
sminfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get api info: %w", err)
}
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader(), cctx.Int("parallel-fetch-limit"))
// Create / expose the worker
workerApi := &worker{
LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
SealProof: spt,
TaskTypes: taskTypes,
}, remote, localStore, nodeApi),
}
mux := mux.NewRouter()
log.Info("Setting up control endpoint at " + address)
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi))
mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Local: localStore}).ServeHTTP)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
ah := &auth.Handler{
Verify: nodeApi.AuthVerify,
Next: mux.ServeHTTP,
}
srv := &http.Server{
Handler: ah,
BaseContext: func(listener net.Listener) context.Context {
return ctx
},
}
go func() {
<-ctx.Done()
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
log.Warn("Graceful shutdown successful")
}()
nl, err := net.Listen("tcp", address)
if err != nil {
return err
}
log.Info("Waiting for tasks")
go func() {
if err := nodeApi.WorkerConnect(ctx, "ws://"+address+"/rpc/v0"); err != nil {
log.Errorf("Registering worker failed: %+v", err)
cancel()
return
}
}()
return srv.Serve(nl)
},
}
func watchMinerConn(ctx context.Context, cctx *cli.Context, nodeApi api.StorageMiner) {
go func() {
closing, err := nodeApi.Closing(ctx)
if err != nil {
log.Errorf("failed to get remote closing channel: %+v", err)
}
select {
case <-closing:
case <-ctx.Done():
}
if ctx.Err() != nil {
return // graceful shutdown
}
log.Warnf("Connection with miner node lost, restarting")
exe, err := os.Executable()
if err != nil {
log.Errorf("getting executable for auto-restart: %+v", err)
}
log.Sync()
// TODO: there are probably cleaner/more graceful ways to restart,
// but this is good enough for now (FSM can recover from the mess this creates)
if err := syscall.Exec(exe, []string{exe,
fmt.Sprintf("--worker-repo=%s", cctx.String("worker-repo")),
fmt.Sprintf("--miner-repo=%s", cctx.String("miner-repo")),
fmt.Sprintf("--enable-gpu-proving=%t", cctx.Bool("enable-gpu-proving")),
"run",
fmt.Sprintf("--listen=%s", cctx.String("listen")),
fmt.Sprintf("--no-local-storage=%t", cctx.Bool("no-local-storage")),
fmt.Sprintf("--precommit1=%t", cctx.Bool("precommit1")),
fmt.Sprintf("--precommit2=%t", cctx.Bool("precommit2")),
fmt.Sprintf("--commit=%t", cctx.Bool("commit")),
}, os.Environ()); err != nil {
fmt.Println(err)
}
}()
}
func extractRoutableIP(timeout time.Duration) (string, error) {
minerMultiAddrKey := "MINER_API_INFO"
deprecatedMinerMultiAddrKey := "STORAGE_API_INFO"
env, ok := os.LookupEnv(minerMultiAddrKey)
if !ok {
// TODO remove after deprecation period
env, ok = os.LookupEnv(deprecatedMinerMultiAddrKey)
if ok {
log.Warnf("Using a deprecated env(%s) value, please use env(%s) instead.", deprecatedMinerMultiAddrKey, minerMultiAddrKey)
}
return "", xerrors.New("MINER_API_INFO environment variable required to extract IP")
}
minerAddr := strings.Split(env, "/")
conn, err := net.DialTimeout("tcp", minerAddr[2]+":"+minerAddr[4], timeout)
if err != nil {
return "", err
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.TCPAddr)
return strings.Split(localAddr.IP.String(), ":")[0], nil
}