468 lines
12 KiB
Go
468 lines
12 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/gorilla/mux"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
"github.com/urfave/cli/v2"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-jsonrpc"
|
|
"github.com/filecoin-project/go-jsonrpc/auth"
|
|
paramfetch "github.com/filecoin-project/go-paramfetch"
|
|
"github.com/filecoin-project/sector-storage/ffiwrapper"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
"github.com/filecoin-project/lotus/api/apistruct"
|
|
"github.com/filecoin-project/lotus/build"
|
|
lcli "github.com/filecoin-project/lotus/cli"
|
|
"github.com/filecoin-project/lotus/lib/lotuslog"
|
|
"github.com/filecoin-project/lotus/lib/rpcenc"
|
|
"github.com/filecoin-project/lotus/node/repo"
|
|
sectorstorage "github.com/filecoin-project/sector-storage"
|
|
"github.com/filecoin-project/sector-storage/sealtasks"
|
|
"github.com/filecoin-project/sector-storage/stores"
|
|
)
|
|
|
|
var log = logging.Logger("main")
|
|
|
|
const FlagWorkerRepo = "worker-repo"
|
|
|
|
// TODO remove after deprecation period
|
|
const FlagWorkerRepoDeprecation = "workerrepo"
|
|
|
|
func main() {
|
|
lotuslog.SetupLogLevels()
|
|
|
|
log.Info("Starting lotus worker")
|
|
|
|
local := []*cli.Command{
|
|
runCmd,
|
|
}
|
|
|
|
app := &cli.App{
|
|
Name: "lotus-worker",
|
|
Usage: "Remote miner worker",
|
|
Version: build.UserVersion(),
|
|
Flags: []cli.Flag{
|
|
&cli.StringFlag{
|
|
Name: FlagWorkerRepo,
|
|
Aliases: []string{FlagWorkerRepoDeprecation},
|
|
EnvVars: []string{"LOTUS_WORKER_PATH", "WORKER_PATH"},
|
|
Value: "~/.lotusworker", // TODO: Consider XDG_DATA_HOME
|
|
Usage: fmt.Sprintf("Specify worker repo path. flag %s and env WORKER_PATH are DEPRECATION, will REMOVE SOON", FlagWorkerRepoDeprecation),
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "miner-repo",
|
|
Aliases: []string{"storagerepo"},
|
|
EnvVars: []string{"LOTUS_MINER_PATH", "LOTUS_STORAGE_PATH"},
|
|
Value: "~/.lotusminer", // TODO: Consider XDG_DATA_HOME
|
|
Usage: fmt.Sprintf("Specify miner repo path. flag storagerepo and env LOTUS_STORAGE_PATH are DEPRECATION, will REMOVE SOON"),
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "enable-gpu-proving",
|
|
Usage: "enable use of GPU for mining operations",
|
|
Value: true,
|
|
},
|
|
},
|
|
|
|
Commands: local,
|
|
}
|
|
app.Setup()
|
|
app.Metadata["repoType"] = repo.Worker
|
|
|
|
if err := app.Run(os.Args); err != nil {
|
|
log.Warnf("%+v", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
var runCmd = &cli.Command{
|
|
Name: "run",
|
|
Usage: "Start lotus worker",
|
|
Flags: []cli.Flag{
|
|
&cli.StringFlag{
|
|
Name: "listen",
|
|
Usage: "host address and port the worker api will listen on",
|
|
Value: "0.0.0.0:3456",
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "address",
|
|
Hidden: true,
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "no-local-storage",
|
|
Usage: "don't use storageminer repo for sector storage",
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "addpiece",
|
|
Usage: "enable addpiece",
|
|
Value: true,
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "precommit1",
|
|
Usage: "enable precommit1 (32G sectors: 1 core, 128GiB Memory)",
|
|
Value: true,
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "unseal",
|
|
Usage: "enable unsealing (32G sectors: 1 core, 128GiB Memory)",
|
|
Value: true,
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "precommit2",
|
|
Usage: "enable precommit2 (32G sectors: all cores, 96GiB Memory)",
|
|
Value: true,
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "commit",
|
|
Usage: "enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap)",
|
|
Value: true,
|
|
},
|
|
&cli.IntFlag{
|
|
Name: "parallel-fetch-limit",
|
|
Usage: "maximum fetch operations to run in parallel",
|
|
Value: 5,
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "timeout",
|
|
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
|
|
Value: "30m",
|
|
},
|
|
},
|
|
Before: func(cctx *cli.Context) error {
|
|
if cctx.IsSet("address") {
|
|
log.Warnf("The '--address' flag is deprecated, it has been replaced by '--listen'")
|
|
if err := cctx.Set("listen", cctx.String("address")); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
},
|
|
Action: func(cctx *cli.Context) error {
|
|
if !cctx.Bool("enable-gpu-proving") {
|
|
if err := os.Setenv("BELLMAN_NO_GPU", "true"); err != nil {
|
|
return xerrors.Errorf("could not set no-gpu env: %+v", err)
|
|
}
|
|
}
|
|
|
|
// Connect to storage-miner
|
|
var nodeApi api.StorageMiner
|
|
var closer func()
|
|
var err error
|
|
for {
|
|
nodeApi, closer, err = lcli.GetStorageMinerAPI(cctx)
|
|
if err == nil {
|
|
break
|
|
}
|
|
fmt.Printf("\r\x1b[0KConnecting to miner API... (%s)", err)
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
|
|
defer closer()
|
|
ctx := lcli.ReqContext(cctx)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
v, err := nodeApi.Version(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if v.APIVersion != build.APIVersion {
|
|
return xerrors.Errorf("lotus-miner API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion})
|
|
}
|
|
log.Infof("Remote version %s", v)
|
|
|
|
watchMinerConn(ctx, cctx, nodeApi)
|
|
|
|
// Check params
|
|
|
|
act, err := nodeApi.ActorAddress(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ssize, err := nodeApi.ActorSectorSize(ctx, act)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if cctx.Bool("commit") {
|
|
if err := paramfetch.GetParams(ctx, build.ParametersJSON(), uint64(ssize)); err != nil {
|
|
return xerrors.Errorf("get params: %w", err)
|
|
}
|
|
}
|
|
|
|
var taskTypes []sealtasks.TaskType
|
|
|
|
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize)
|
|
|
|
if cctx.Bool("addpiece") {
|
|
taskTypes = append(taskTypes, sealtasks.TTAddPiece)
|
|
}
|
|
if cctx.Bool("precommit1") {
|
|
taskTypes = append(taskTypes, sealtasks.TTPreCommit1)
|
|
}
|
|
if cctx.Bool("unseal") {
|
|
taskTypes = append(taskTypes, sealtasks.TTUnseal)
|
|
}
|
|
if cctx.Bool("precommit2") {
|
|
taskTypes = append(taskTypes, sealtasks.TTPreCommit2)
|
|
}
|
|
if cctx.Bool("commit") {
|
|
taskTypes = append(taskTypes, sealtasks.TTCommit2)
|
|
}
|
|
|
|
if len(taskTypes) == 0 {
|
|
return xerrors.Errorf("no task types specified")
|
|
}
|
|
|
|
// Open repo
|
|
|
|
repoPath := cctx.String(FlagWorkerRepo)
|
|
r, err := repo.NewFS(repoPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ok, err := r.Exists()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !ok {
|
|
if err := r.Init(repo.Worker); err != nil {
|
|
return err
|
|
}
|
|
|
|
lr, err := r.Lock(repo.Worker)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var localPaths []stores.LocalPath
|
|
|
|
if !cctx.Bool("no-local-storage") {
|
|
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
|
|
ID: stores.ID(uuid.New().String()),
|
|
Weight: 10,
|
|
CanSeal: true,
|
|
CanStore: false,
|
|
}, "", " ")
|
|
if err != nil {
|
|
return xerrors.Errorf("marshaling storage config: %w", err)
|
|
}
|
|
|
|
if err := ioutil.WriteFile(filepath.Join(lr.Path(), "sectorstore.json"), b, 0644); err != nil {
|
|
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
|
|
}
|
|
|
|
localPaths = append(localPaths, stores.LocalPath{
|
|
Path: lr.Path(),
|
|
})
|
|
}
|
|
|
|
if err := lr.SetStorage(func(sc *stores.StorageConfig) {
|
|
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
|
|
}); err != nil {
|
|
return xerrors.Errorf("set storage config: %w", err)
|
|
}
|
|
|
|
{
|
|
// init datastore for r.Exists
|
|
_, err := lr.Datastore("/metadata")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err := lr.Close(); err != nil {
|
|
return xerrors.Errorf("close repo: %w", err)
|
|
}
|
|
}
|
|
|
|
lr, err := r.Lock(repo.Worker)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("Opening local storage; connecting to master")
|
|
const unspecifiedAddress = "0.0.0.0"
|
|
address := cctx.String("listen")
|
|
addressSlice := strings.Split(address, ":")
|
|
if ip := net.ParseIP(addressSlice[0]); ip != nil {
|
|
if ip.String() == unspecifiedAddress {
|
|
timeout, err := time.ParseDuration(cctx.String("timeout"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rip, err := extractRoutableIP(timeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
address = rip + ":" + addressSlice[1]
|
|
}
|
|
}
|
|
|
|
localStore, err := stores.NewLocal(ctx, lr, nodeApi, []string{"http://" + address + "/remote"})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Setup remote sector store
|
|
spt, err := ffiwrapper.SealProofTypeFromSectorSize(ssize)
|
|
if err != nil {
|
|
return xerrors.Errorf("getting proof type: %w", err)
|
|
}
|
|
|
|
sminfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
|
|
if err != nil {
|
|
return xerrors.Errorf("could not get api info: %w", err)
|
|
}
|
|
|
|
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader(), cctx.Int("parallel-fetch-limit"))
|
|
|
|
// Create / expose the worker
|
|
|
|
workerApi := &worker{
|
|
LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
|
|
SealProof: spt,
|
|
TaskTypes: taskTypes,
|
|
}, remote, localStore, nodeApi),
|
|
}
|
|
|
|
mux := mux.NewRouter()
|
|
|
|
log.Info("Setting up control endpoint at " + address)
|
|
|
|
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
|
|
rpcServer := jsonrpc.NewServer(readerServerOpt)
|
|
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi))
|
|
|
|
mux.Handle("/rpc/v0", rpcServer)
|
|
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
|
|
mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Local: localStore}).ServeHTTP)
|
|
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
|
|
|
|
ah := &auth.Handler{
|
|
Verify: nodeApi.AuthVerify,
|
|
Next: mux.ServeHTTP,
|
|
}
|
|
|
|
srv := &http.Server{
|
|
Handler: ah,
|
|
BaseContext: func(listener net.Listener) context.Context {
|
|
return ctx
|
|
},
|
|
}
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
log.Warn("Shutting down...")
|
|
if err := srv.Shutdown(context.TODO()); err != nil {
|
|
log.Errorf("shutting down RPC server failed: %s", err)
|
|
}
|
|
log.Warn("Graceful shutdown successful")
|
|
}()
|
|
|
|
nl, err := net.Listen("tcp", address)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("Waiting for tasks")
|
|
|
|
go func() {
|
|
if err := nodeApi.WorkerConnect(ctx, "ws://"+address+"/rpc/v0"); err != nil {
|
|
log.Errorf("Registering worker failed: %+v", err)
|
|
cancel()
|
|
return
|
|
}
|
|
}()
|
|
|
|
return srv.Serve(nl)
|
|
},
|
|
}
|
|
|
|
func watchMinerConn(ctx context.Context, cctx *cli.Context, nodeApi api.StorageMiner) {
|
|
go func() {
|
|
closing, err := nodeApi.Closing(ctx)
|
|
if err != nil {
|
|
log.Errorf("failed to get remote closing channel: %+v", err)
|
|
}
|
|
|
|
select {
|
|
case <-closing:
|
|
case <-ctx.Done():
|
|
}
|
|
|
|
if ctx.Err() != nil {
|
|
return // graceful shutdown
|
|
}
|
|
|
|
log.Warnf("Connection with miner node lost, restarting")
|
|
|
|
exe, err := os.Executable()
|
|
if err != nil {
|
|
log.Errorf("getting executable for auto-restart: %+v", err)
|
|
}
|
|
|
|
log.Sync()
|
|
|
|
// TODO: there are probably cleaner/more graceful ways to restart,
|
|
// but this is good enough for now (FSM can recover from the mess this creates)
|
|
if err := syscall.Exec(exe, []string{exe,
|
|
fmt.Sprintf("--worker-repo=%s", cctx.String("worker-repo")),
|
|
fmt.Sprintf("--miner-repo=%s", cctx.String("miner-repo")),
|
|
fmt.Sprintf("--enable-gpu-proving=%t", cctx.Bool("enable-gpu-proving")),
|
|
"run",
|
|
fmt.Sprintf("--listen=%s", cctx.String("listen")),
|
|
fmt.Sprintf("--no-local-storage=%t", cctx.Bool("no-local-storage")),
|
|
fmt.Sprintf("--addpiece=%t", cctx.Bool("addpiece")),
|
|
fmt.Sprintf("--precommit1=%t", cctx.Bool("precommit1")),
|
|
fmt.Sprintf("--unseal=%t", cctx.Bool("unseal")),
|
|
fmt.Sprintf("--precommit2=%t", cctx.Bool("precommit2")),
|
|
fmt.Sprintf("--commit=%t", cctx.Bool("commit")),
|
|
fmt.Sprintf("--parallel-fetch-limit=%d", cctx.Int("parallel-fetch-limit")),
|
|
fmt.Sprintf("--timeout=%s", cctx.String("timeout")),
|
|
}, os.Environ()); err != nil {
|
|
fmt.Println(err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
func extractRoutableIP(timeout time.Duration) (string, error) {
|
|
minerMultiAddrKey := "MINER_API_INFO"
|
|
deprecatedMinerMultiAddrKey := "STORAGE_API_INFO"
|
|
env, ok := os.LookupEnv(minerMultiAddrKey)
|
|
if !ok {
|
|
// TODO remove after deprecation period
|
|
env, ok = os.LookupEnv(deprecatedMinerMultiAddrKey)
|
|
if ok {
|
|
log.Warnf("Using a deprecated env(%s) value, please use env(%s) instead.", deprecatedMinerMultiAddrKey, minerMultiAddrKey)
|
|
}
|
|
return "", xerrors.New("MINER_API_INFO environment variable required to extract IP")
|
|
}
|
|
minerAddr := strings.Split(env, "/")
|
|
conn, err := net.DialTimeout("tcp", minerAddr[2]+":"+minerAddr[4], timeout)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer conn.Close()
|
|
|
|
localAddr := conn.LocalAddr().(*net.TCPAddr)
|
|
|
|
return strings.Split(localAddr.IP.String(), ":")[0], nil
|
|
}
|