lotus/cmd/lotus-seal-worker/main.go

560 lines
14 KiB
Go
Raw Normal View History

package main
2019-11-21 00:52:59 +00:00
import (
2020-03-13 01:37:38 +00:00
"context"
2020-03-16 18:46:02 +00:00
"encoding/json"
"fmt"
2020-03-16 18:46:02 +00:00
"io/ioutil"
2020-03-16 17:50:07 +00:00
"net"
2020-03-13 01:37:38 +00:00
"net/http"
2019-11-21 00:52:59 +00:00
"os"
2020-03-16 18:46:02 +00:00
"path/filepath"
"strings"
"time"
2020-03-16 18:46:02 +00:00
"github.com/google/uuid"
2020-03-13 01:37:38 +00:00
"github.com/gorilla/mux"
2020-09-14 07:44:55 +00:00
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
2020-08-30 18:28:58 +00:00
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
2020-10-21 08:37:50 +00:00
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
2020-06-05 22:59:01 +00:00
"golang.org/x/xerrors"
2019-11-21 00:52:59 +00:00
2020-05-20 17:43:22 +00:00
"github.com/filecoin-project/go-jsonrpc"
2020-05-20 18:23:51 +00:00
"github.com/filecoin-project/go-jsonrpc/auth"
2020-03-11 01:57:52 +00:00
paramfetch "github.com/filecoin-project/go-paramfetch"
2020-09-14 07:44:55 +00:00
"github.com/filecoin-project/go-statestore"
2020-03-23 11:40:02 +00:00
2020-03-11 01:57:52 +00:00
"github.com/filecoin-project/lotus/api"
2019-11-21 00:52:59 +00:00
"github.com/filecoin-project/lotus/build"
2020-03-11 01:57:52 +00:00
lcli "github.com/filecoin-project/lotus/cli"
cliutil "github.com/filecoin-project/lotus/cli/util"
2020-08-17 13:39:33 +00:00
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
2020-01-08 13:49:34 +00:00
"github.com/filecoin-project/lotus/lib/lotuslog"
2020-08-14 14:06:53 +00:00
"github.com/filecoin-project/lotus/lib/rpcenc"
2020-10-21 08:37:50 +00:00
"github.com/filecoin-project/lotus/metrics"
2020-09-14 07:44:55 +00:00
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/repo"
2019-11-21 00:52:59 +00:00
)
var log = logging.Logger("main")
const FlagWorkerRepo = "worker-repo"
2020-07-17 13:18:40 +00:00
2020-07-10 12:18:09 +00:00
// TODO remove after deprecation period
const FlagWorkerRepoDeprecation = "workerrepo"
2020-03-13 01:37:38 +00:00
2019-11-21 00:52:59 +00:00
func main() {
api.RunningNodeType = api.NodeWorker
2020-01-08 13:49:34 +00:00
lotuslog.SetupLogLevels()
2019-11-21 00:52:59 +00:00
local := []*cli.Command{
runCmd,
2020-08-30 18:28:58 +00:00
infoCmd,
storageCmd,
setCmd,
waitQuietCmd,
tasksCmd,
2019-11-21 00:52:59 +00:00
}
app := &cli.App{
Name: "lotus-worker",
Usage: "Remote miner worker",
2020-06-01 18:43:51 +00:00
Version: build.UserVersion(),
2019-11-21 00:52:59 +00:00
Flags: []cli.Flag{
&cli.StringFlag{
2020-07-08 10:38:59 +00:00
Name: FlagWorkerRepo,
2020-07-10 12:18:09 +00:00
Aliases: []string{FlagWorkerRepoDeprecation},
EnvVars: []string{"LOTUS_WORKER_PATH", "WORKER_PATH"},
2019-11-21 00:52:59 +00:00
Value: "~/.lotusworker", // TODO: Consider XDG_DATA_HOME
2020-07-10 12:18:09 +00:00
Usage: fmt.Sprintf("Specify worker repo path. flag %s and env WORKER_PATH are DEPRECATION, will REMOVE SOON", FlagWorkerRepoDeprecation),
2019-11-21 00:52:59 +00:00
},
&cli.StringFlag{
2020-07-08 10:38:59 +00:00
Name: "miner-repo",
2020-07-10 12:18:09 +00:00
Aliases: []string{"storagerepo"},
EnvVars: []string{"LOTUS_MINER_PATH", "LOTUS_STORAGE_PATH"},
2020-07-08 10:38:59 +00:00
Value: "~/.lotusminer", // TODO: Consider XDG_DATA_HOME
2020-07-10 12:18:09 +00:00
Usage: fmt.Sprintf("Specify miner repo path. flag storagerepo and env LOTUS_STORAGE_PATH are DEPRECATION, will REMOVE SOON"),
2019-11-21 00:52:59 +00:00
},
2019-12-07 14:19:46 +00:00
&cli.BoolFlag{
Name: "enable-gpu-proving",
Usage: "enable use of GPU for mining operations",
Value: true,
},
2019-11-21 00:52:59 +00:00
},
Commands: local,
}
app.Setup()
2020-03-25 21:15:10 +00:00
app.Metadata["repoType"] = repo.Worker
2019-11-21 00:52:59 +00:00
if err := app.Run(os.Args); err != nil {
2019-11-21 18:38:43 +00:00
log.Warnf("%+v", err)
2019-11-21 00:52:59 +00:00
return
}
}
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus worker",
2020-03-16 17:50:07 +00:00
Flags: []cli.Flag{
&cli.StringFlag{
Name: "listen",
Usage: "host address and port the worker api will listen on",
Value: "0.0.0.0:3456",
2020-03-16 17:50:07 +00:00
},
&cli.StringFlag{
Name: "address",
Hidden: true,
},
2020-03-16 18:46:02 +00:00
&cli.BoolFlag{
Name: "no-local-storage",
Usage: "don't use storageminer repo for sector storage",
},
2020-09-30 06:23:35 +00:00
&cli.BoolFlag{
Name: "no-swap",
Usage: "don't use swap",
Value: false,
},
2020-08-14 14:06:53 +00:00
&cli.BoolFlag{
Name: "addpiece",
Usage: "enable addpiece",
Value: true,
},
2020-03-25 21:15:10 +00:00
&cli.BoolFlag{
Name: "precommit1",
Usage: "enable precommit1 (32G sectors: 1 core, 128GiB Memory)",
Value: true,
},
2020-08-04 12:32:09 +00:00
&cli.BoolFlag{
Name: "unseal",
Usage: "enable unsealing (32G sectors: 1 core, 128GiB Memory)",
Value: true,
},
2020-03-25 21:15:10 +00:00
&cli.BoolFlag{
Name: "precommit2",
Usage: "enable precommit2 (32G sectors: all cores, 96GiB Memory)",
Value: true,
},
&cli.BoolFlag{
Name: "commit",
Usage: "enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap)",
Value: true,
},
&cli.IntFlag{
Name: "parallel-fetch-limit",
Usage: "maximum fetch operations to run in parallel",
Value: 5,
},
&cli.StringFlag{
Name: "timeout",
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
Value: "30m",
},
2020-03-16 17:50:07 +00:00
},
Before: func(cctx *cli.Context) error {
if cctx.IsSet("address") {
log.Warnf("The '--address' flag is deprecated, it has been replaced by '--listen'")
if err := cctx.Set("listen", cctx.String("address")); err != nil {
return err
}
}
return nil
},
2019-11-21 00:52:59 +00:00
Action: func(cctx *cli.Context) error {
2020-08-30 18:28:58 +00:00
log.Info("Starting lotus worker")
2020-03-11 01:57:52 +00:00
if !cctx.Bool("enable-gpu-proving") {
if err := os.Setenv("BELLMAN_NO_GPU", "true"); err != nil {
return xerrors.Errorf("could not set no-gpu env: %+v", err)
}
2020-03-11 01:57:52 +00:00
}
2020-03-16 17:50:07 +00:00
// Connect to storage-miner
ctx := lcli.ReqContext(cctx)
var nodeApi api.StorageMiner
var closer func()
var err error
for {
nodeApi, closer, err = lcli.GetStorageMinerAPI(cctx, cliutil.StorageMinerUseHttp)
if err == nil {
_, err = nodeApi.Version(ctx)
if err == nil {
break
}
}
fmt.Printf("\r\x1b[0KConnecting to miner API... (%s)", err)
time.Sleep(time.Second)
continue
2020-03-11 01:57:52 +00:00
}
2020-03-11 01:57:52 +00:00
defer closer()
2020-03-18 23:23:28 +00:00
ctx, cancel := context.WithCancel(ctx)
defer cancel()
2020-03-11 01:57:52 +00:00
2020-10-21 08:37:50 +00:00
// Register all metric views
if err := view.Register(
metrics.DefaultViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}
2020-03-11 01:57:52 +00:00
v, err := nodeApi.Version(ctx)
if err != nil {
return err
}
if v.APIVersion != api.MinerAPIVersion {
return xerrors.Errorf("lotus-miner API version doesn't match: expected: %s", api.APIVersion{APIVersion: api.MinerAPIVersion})
2020-03-11 01:57:52 +00:00
}
2020-03-13 01:37:38 +00:00
log.Infof("Remote version %s", v)
2020-03-11 01:57:52 +00:00
2020-03-16 17:50:07 +00:00
// Check params
2020-03-11 01:57:52 +00:00
act, err := nodeApi.ActorAddress(ctx)
if err != nil {
return err
}
ssize, err := nodeApi.ActorSectorSize(ctx, act)
if err != nil {
return err
}
2020-03-25 21:15:10 +00:00
if cctx.Bool("commit") {
if err := paramfetch.GetParams(ctx, build.ParametersJSON(), uint64(ssize)); err != nil {
2020-03-25 21:15:10 +00:00
return xerrors.Errorf("get params: %w", err)
}
}
var taskTypes []sealtasks.TaskType
2020-06-04 13:54:54 +00:00
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize)
2020-08-14 14:06:53 +00:00
if cctx.Bool("addpiece") {
taskTypes = append(taskTypes, sealtasks.TTAddPiece)
}
2020-03-25 21:15:10 +00:00
if cctx.Bool("precommit1") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit1)
}
2020-08-04 12:32:09 +00:00
if cctx.Bool("unseal") {
taskTypes = append(taskTypes, sealtasks.TTUnseal)
}
2020-03-25 21:15:10 +00:00
if cctx.Bool("precommit2") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit2)
}
if cctx.Bool("commit") {
taskTypes = append(taskTypes, sealtasks.TTCommit2)
}
if len(taskTypes) == 0 {
return xerrors.Errorf("no task types specified")
2020-03-11 01:57:52 +00:00
}
2020-03-16 17:50:07 +00:00
// Open repo
2020-07-08 10:38:59 +00:00
repoPath := cctx.String(FlagWorkerRepo)
2020-03-13 01:37:38 +00:00
r, err := repo.NewFS(repoPath)
if err != nil {
return err
}
ok, err := r.Exists()
if err != nil {
return err
}
if !ok {
2020-03-16 18:46:02 +00:00
if err := r.Init(repo.Worker); err != nil {
return err
}
lr, err := r.Lock(repo.Worker)
if err != nil {
return err
}
var localPaths []stores.LocalPath
2020-03-16 18:46:02 +00:00
if !cctx.Bool("no-local-storage") {
2020-03-19 15:10:19 +00:00
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
2020-03-16 18:46:02 +00:00
ID: stores.ID(uuid.New().String()),
Weight: 10,
CanSeal: true,
CanStore: false,
}, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
}
if err := ioutil.WriteFile(filepath.Join(lr.Path(), "sectorstore.json"), b, 0644); err != nil {
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
}
localPaths = append(localPaths, stores.LocalPath{
2020-03-16 18:46:02 +00:00
Path: lr.Path(),
})
}
if err := lr.SetStorage(func(sc *stores.StorageConfig) {
2020-03-16 18:46:02 +00:00
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
}); err != nil {
return xerrors.Errorf("set storage config: %w", err)
}
2020-03-18 23:23:28 +00:00
{
// init datastore for r.Exists
2021-01-26 10:25:34 +00:00
_, err := lr.Datastore(context.Background(), "/metadata")
2020-03-18 23:23:28 +00:00
if err != nil {
return err
}
}
2020-03-16 18:46:02 +00:00
if err := lr.Close(); err != nil {
return xerrors.Errorf("close repo: %w", err)
}
2020-03-13 01:37:38 +00:00
}
lr, err := r.Lock(repo.Worker)
if err != nil {
return err
}
defer func() {
if err := lr.Close(); err != nil {
log.Error("closing repo", err)
}
}()
2021-01-26 10:25:34 +00:00
ds, err := lr.Datastore(context.Background(), "/metadata")
if err != nil {
return err
}
2020-03-13 01:37:38 +00:00
2020-03-19 15:10:19 +00:00
log.Info("Opening local storage; connecting to master")
const unspecifiedAddress = "0.0.0.0"
address := cctx.String("listen")
addressSlice := strings.Split(address, ":")
if ip := net.ParseIP(addressSlice[0]); ip != nil {
if ip.String() == unspecifiedAddress {
timeout, err := time.ParseDuration(cctx.String("timeout"))
if err != nil {
return err
}
rip, err := extractRoutableIP(timeout)
if err != nil {
return err
}
address = rip + ":" + addressSlice[1]
}
}
2020-03-19 15:10:19 +00:00
localStore, err := stores.NewLocal(ctx, lr, nodeApi, []string{"http://" + address + "/remote"})
2020-03-13 01:37:38 +00:00
if err != nil {
return err
}
2020-03-16 17:50:07 +00:00
// Setup remote sector store
2020-03-13 01:37:38 +00:00
sminfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get api info: %w", err)
}
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader(), cctx.Int("parallel-fetch-limit"))
2020-03-13 01:37:38 +00:00
2020-11-25 16:05:45 +00:00
fh := &stores.FetchHandler{Local: localStore}
remoteHandler := func(w http.ResponseWriter, r *http.Request) {
2021-03-25 14:09:50 +00:00
if !auth.HasPerm(r.Context(), nil, api.PermAdmin) {
2020-11-25 16:05:45 +00:00
w.WriteHeader(401)
_ = json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing admin permission"})
return
}
fh.ServeHTTP(w, r)
}
2020-03-16 17:50:07 +00:00
// Create / expose the worker
wsts := statestore.New(namespace.Wrap(ds, modules.WorkerCallsPrefix))
2020-09-14 07:44:55 +00:00
2020-03-13 01:37:38 +00:00
workerApi := &worker{
2020-03-23 11:40:02 +00:00
LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
2020-03-25 21:15:10 +00:00
TaskTypes: taskTypes,
2020-09-30 06:23:35 +00:00
NoSwap: cctx.Bool("no-swap"),
2020-09-14 07:44:55 +00:00
}, remote, localStore, nodeApi, nodeApi, wsts),
2020-08-30 18:28:58 +00:00
localStore: localStore,
ls: lr,
2020-03-13 01:37:38 +00:00
}
mux := mux.NewRouter()
log.Info("Setting up control endpoint at " + address)
2020-03-18 04:40:25 +00:00
2020-08-14 14:06:53 +00:00
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(readerServerOpt)
2021-03-25 14:09:50 +00:00
rpcServer.Register("Filecoin", api.PermissionedWorkerAPI(metrics.MetricedWorkerAPI(workerApi)))
2020-03-13 01:37:38 +00:00
mux.Handle("/rpc/v0", rpcServer)
2020-08-14 14:06:53 +00:00
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
2020-11-25 16:05:45 +00:00
mux.PathPrefix("/remote").HandlerFunc(remoteHandler)
2020-03-13 01:37:38 +00:00
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
ah := &auth.Handler{
Verify: nodeApi.AuthVerify,
Next: mux.ServeHTTP,
}
2020-03-18 23:23:28 +00:00
srv := &http.Server{
Handler: ah,
BaseContext: func(listener net.Listener) context.Context {
2020-10-21 08:37:50 +00:00
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker"))
2020-03-18 23:23:28 +00:00
return ctx
},
}
2020-03-13 01:37:38 +00:00
go func() {
2020-03-18 04:40:25 +00:00
<-ctx.Done()
2020-06-02 19:30:45 +00:00
log.Warn("Shutting down...")
2020-03-13 01:37:38 +00:00
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
log.Warn("Graceful shutdown successful")
}()
2020-03-16 17:50:07 +00:00
nl, err := net.Listen("tcp", address)
2020-03-16 17:50:07 +00:00
if err != nil {
return err
}
2020-08-30 18:28:58 +00:00
{
a, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return xerrors.Errorf("parsing address: %w", err)
}
ma, err := manet.FromNetAddr(a)
if err != nil {
return xerrors.Errorf("creating api multiaddress: %w", err)
}
if err := lr.SetAPIEndpoint(ma); err != nil {
return xerrors.Errorf("setting api endpoint: %w", err)
}
ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get miner API info: %w", err)
}
// TODO: ideally this would be a token with some permissions dropped
if err := lr.SetAPIToken(ainfo.Token); err != nil {
return xerrors.Errorf("setting api token: %w", err)
}
}
minerSession, err := nodeApi.Session(ctx)
if err != nil {
return xerrors.Errorf("getting miner session: %w", err)
}
waitQuietCh := func() chan struct{} {
out := make(chan struct{})
go func() {
workerApi.LocalWorker.WaitQuiet()
close(out)
}()
return out
}
2020-03-18 23:23:28 +00:00
go func() {
heartbeats := time.NewTicker(stores.HeartbeatInterval)
defer heartbeats.Stop()
var redeclareStorage bool
var readyCh chan struct{}
2020-09-22 16:36:44 +00:00
for {
// If we're reconnecting, redeclare storage first
if redeclareStorage {
log.Info("Redeclaring local storage")
2020-09-28 19:06:49 +00:00
if err := localStore.Redeclare(ctx); err != nil {
log.Errorf("Redeclaring local storage failed: %+v", err)
select {
case <-ctx.Done():
return // graceful shutdown
case <-heartbeats.C:
}
continue
2020-09-28 19:06:49 +00:00
}
}
// TODO: we could get rid of this, but that requires tracking resources for restarted tasks correctly
if readyCh == nil {
log.Info("Making sure no local tasks are running")
readyCh = waitQuietCh()
}
for {
curSession, err := nodeApi.Session(ctx)
if err != nil {
log.Errorf("heartbeat: checking remote session failed: %+v", err)
} else {
if curSession != minerSession {
minerSession = curSession
break
}
}
select {
case <-readyCh:
if err := nodeApi.WorkerConnect(ctx, "http://"+address+"/rpc/v0"); err != nil {
log.Errorf("Registering worker failed: %+v", err)
cancel()
return
}
log.Info("Worker registered successfully, waiting for tasks")
readyCh = nil
case <-heartbeats.C:
case <-ctx.Done():
return // graceful shutdown
}
2020-09-22 16:36:44 +00:00
}
2020-09-22 16:36:44 +00:00
log.Errorf("LOTUS-MINER CONNECTION LOST")
2020-09-28 19:06:49 +00:00
redeclareStorage = true
2020-09-22 16:36:44 +00:00
}
}()
return srv.Serve(nl)
},
}
func extractRoutableIP(timeout time.Duration) (string, error) {
minerMultiAddrKey := "MINER_API_INFO"
deprecatedMinerMultiAddrKey := "STORAGE_API_INFO"
env, ok := os.LookupEnv(minerMultiAddrKey)
if !ok {
// TODO remove after deprecation period
_, ok = os.LookupEnv(deprecatedMinerMultiAddrKey)
if ok {
log.Warnf("Using a deprecated env(%s) value, please use env(%s) instead.", deprecatedMinerMultiAddrKey, minerMultiAddrKey)
}
return "", xerrors.New("MINER_API_INFO environment variable required to extract IP")
}
minerAddr := strings.Split(env, "/")
conn, err := net.DialTimeout("tcp", minerAddr[2]+":"+minerAddr[4], timeout)
if err != nil {
return "", err
}
defer conn.Close() //nolint:errcheck
localAddr := conn.LocalAddr().(*net.TCPAddr)
return strings.Split(localAddr.IP.String(), ":")[0], nil
}