We were ignoring quite a few error cases, and had one case where we weren't actually updating state where we wanted to. Unfortunately, if the linter doesn't pass, nobody has any reason to actually check lint failures in CI. There are three remaining XXXs marked in the code for lint.
471 lines
12 KiB
471 lines
12 KiB
package main
import (
logging "github.com/ipfs/go-log/v2"
paramfetch "github.com/filecoin-project/go-paramfetch"
lcli "github.com/filecoin-project/lotus/cli"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
var log = logging.Logger("main")
const FlagWorkerRepo = "worker-repo"
// TODO remove after deprecation period
const FlagWorkerRepoDeprecation = "workerrepo"
func main() {
log.Info("Starting lotus worker")
local := []*cli.Command{
app := &cli.App{
Name: "lotus-worker",
Usage: "Remote miner worker",
Version: build.UserVersion(),
Flags: []cli.Flag{
Name: FlagWorkerRepo,
Aliases: []string{FlagWorkerRepoDeprecation},
EnvVars: []string{"LOTUS_WORKER_PATH", "WORKER_PATH"},
Value: "~/.lotusworker", // TODO: Consider XDG_DATA_HOME
Usage: fmt.Sprintf("Specify worker repo path. flag %s and env WORKER_PATH are DEPRECATION, will REMOVE SOON", FlagWorkerRepoDeprecation),
Name: "miner-repo",
Aliases: []string{"storagerepo"},
Value: "~/.lotusminer", // TODO: Consider XDG_DATA_HOME
Usage: fmt.Sprintf("Specify miner repo path. flag storagerepo and env LOTUS_STORAGE_PATH are DEPRECATION, will REMOVE SOON"),
Name: "enable-gpu-proving",
Usage: "enable use of GPU for mining operations",
Value: true,
Commands: local,
app.Metadata["repoType"] = repo.Worker
if err := app.Run(os.Args); err != nil {
log.Warnf("%+v", err)
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus worker",
Flags: []cli.Flag{
Name: "listen",
Usage: "host address and port the worker api will listen on",
Value: "",
Name: "address",
Hidden: true,
Name: "no-local-storage",
Usage: "don't use storageminer repo for sector storage",
Name: "addpiece",
Usage: "enable addpiece",
Value: true,
Name: "precommit1",
Usage: "enable precommit1 (32G sectors: 1 core, 128GiB Memory)",
Value: true,
Name: "unseal",
Usage: "enable unsealing (32G sectors: 1 core, 128GiB Memory)",
Value: true,
Name: "precommit2",
Usage: "enable precommit2 (32G sectors: all cores, 96GiB Memory)",
Value: true,
Name: "commit",
Usage: "enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap)",
Value: true,
Name: "parallel-fetch-limit",
Usage: "maximum fetch operations to run in parallel",
Value: 5,
Name: "timeout",
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
Value: "30m",
Before: func(cctx *cli.Context) error {
if cctx.IsSet("address") {
log.Warnf("The '--address' flag is deprecated, it has been replaced by '--listen'")
if err := cctx.Set("listen", cctx.String("address")); err != nil {
return err
return nil
Action: func(cctx *cli.Context) error {
if !cctx.Bool("enable-gpu-proving") {
if err := os.Setenv("BELLMAN_NO_GPU", "true"); err != nil {
return xerrors.Errorf("could not set no-gpu env: %+v", err)
// Connect to storage-miner
var nodeApi api.StorageMiner
var closer func()
var err error
for {
nodeApi, closer, err = lcli.GetStorageMinerAPI(cctx,
if err == nil {
fmt.Printf("\r\x1b[0KConnecting to miner API... (%s)", err)
defer closer()
ctx := lcli.ReqContext(cctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
v, err := nodeApi.Version(ctx)
if err != nil {
return err
if v.APIVersion != build.APIVersion {
return xerrors.Errorf("lotus-miner API version doesn't match: local: %s", api.Version{APIVersion: build.APIVersion})
log.Infof("Remote version %s", v)
watchMinerConn(ctx, cctx, nodeApi)
// Check params
act, err := nodeApi.ActorAddress(ctx)
if err != nil {
return err
ssize, err := nodeApi.ActorSectorSize(ctx, act)
if err != nil {
return err
if cctx.Bool("commit") {
if err := paramfetch.GetParams(ctx, build.ParametersJSON(), uint64(ssize)); err != nil {
return xerrors.Errorf("get params: %w", err)
var taskTypes []sealtasks.TaskType
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize)
if cctx.Bool("addpiece") {
taskTypes = append(taskTypes, sealtasks.TTAddPiece)
if cctx.Bool("precommit1") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit1)
if cctx.Bool("unseal") {
taskTypes = append(taskTypes, sealtasks.TTUnseal)
if cctx.Bool("precommit2") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit2)
if cctx.Bool("commit") {
taskTypes = append(taskTypes, sealtasks.TTCommit2)
if len(taskTypes) == 0 {
return xerrors.Errorf("no task types specified")
// Open repo
repoPath := cctx.String(FlagWorkerRepo)
r, err := repo.NewFS(repoPath)
if err != nil {
return err
ok, err := r.Exists()
if err != nil {
return err
if !ok {
if err := r.Init(repo.Worker); err != nil {
return err
lr, err := r.Lock(repo.Worker)
if err != nil {
return err
var localPaths []stores.LocalPath
if !cctx.Bool("no-local-storage") {
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: 10,
CanSeal: true,
CanStore: false,
}, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
if err := ioutil.WriteFile(filepath.Join(lr.Path(), "sectorstore.json"), b, 0644); err != nil {
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
localPaths = append(localPaths, stores.LocalPath{
Path: lr.Path(),
if err := lr.SetStorage(func(sc *stores.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
}); err != nil {
return xerrors.Errorf("set storage config: %w", err)
// init datastore for r.Exists
_, err := lr.Datastore("/metadata")
if err != nil {
return err
if err := lr.Close(); err != nil {
return xerrors.Errorf("close repo: %w", err)
lr, err := r.Lock(repo.Worker)
if err != nil {
return err
log.Info("Opening local storage; connecting to master")
const unspecifiedAddress = ""
address := cctx.String("listen")
addressSlice := strings.Split(address, ":")
if ip := net.ParseIP(addressSlice[0]); ip != nil {
if ip.String() == unspecifiedAddress {
timeout, err := time.ParseDuration(cctx.String("timeout"))
if err != nil {
return err
rip, err := extractRoutableIP(timeout)
if err != nil {
return err
address = rip + ":" + addressSlice[1]
localStore, err := stores.NewLocal(ctx, lr, nodeApi, []string{"http://" + address + "/remote"})
if err != nil {
return err
// Setup remote sector store
spt, err := ffiwrapper.SealProofTypeFromSectorSize(ssize)
if err != nil {
return xerrors.Errorf("getting proof type: %w", err)
sminfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get api info: %w", err)
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader(), cctx.Int("parallel-fetch-limit"))
// Create / expose the worker
workerApi := &worker{
LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
SealProof: spt,
TaskTypes: taskTypes,
}, remote, localStore, nodeApi),
mux := mux.NewRouter()
log.Info("Setting up control endpoint at " + address)
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(readerServerOpt)
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi))
mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Local: localStore}).ServeHTTP)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
ah := &auth.Handler{
Verify: nodeApi.AuthVerify,
Next: mux.ServeHTTP,
srv := &http.Server{
Handler: ah,
BaseContext: func(listener net.Listener) context.Context {
return ctx
go func() {
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
log.Warn("Graceful shutdown successful")
nl, err := net.Listen("tcp", address)
if err != nil {
return err
log.Info("Waiting for tasks")
go func() {
if err := nodeApi.WorkerConnect(ctx, "ws://"+address+"/rpc/v0"); err != nil {
log.Errorf("Registering worker failed: %+v", err)
return srv.Serve(nl)
func watchMinerConn(ctx context.Context, cctx *cli.Context, nodeApi api.StorageMiner) {
go func() {
closing, err := nodeApi.Closing(ctx)
if err != nil {
log.Errorf("failed to get remote closing channel: %+v", err)
select {
case <-closing:
case <-ctx.Done():
if ctx.Err() != nil {
return // graceful shutdown
log.Warnf("Connection with miner node lost, restarting")
exe, err := os.Executable()
if err != nil {
log.Errorf("getting executable for auto-restart: %+v", err)
_ = log.Sync()
// TODO: there are probably cleaner/more graceful ways to restart,
// but this is good enough for now (FSM can recover from the mess this creates)
if err := syscall.Exec(exe, []string{exe,
fmt.Sprintf("--worker-repo=%s", cctx.String("worker-repo")),
fmt.Sprintf("--miner-repo=%s", cctx.String("miner-repo")),
fmt.Sprintf("--enable-gpu-proving=%t", cctx.Bool("enable-gpu-proving")),
fmt.Sprintf("--listen=%s", cctx.String("listen")),
fmt.Sprintf("--no-local-storage=%t", cctx.Bool("no-local-storage")),
fmt.Sprintf("--addpiece=%t", cctx.Bool("addpiece")),
fmt.Sprintf("--precommit1=%t", cctx.Bool("precommit1")),
fmt.Sprintf("--unseal=%t", cctx.Bool("unseal")),
fmt.Sprintf("--precommit2=%t", cctx.Bool("precommit2")),
fmt.Sprintf("--commit=%t", cctx.Bool("commit")),
fmt.Sprintf("--parallel-fetch-limit=%d", cctx.Int("parallel-fetch-limit")),
fmt.Sprintf("--timeout=%s", cctx.String("timeout")),
}, os.Environ()); err != nil {
func extractRoutableIP(timeout time.Duration) (string, error) {
minerMultiAddrKey := "MINER_API_INFO"
deprecatedMinerMultiAddrKey := "STORAGE_API_INFO"
env, ok := os.LookupEnv(minerMultiAddrKey)
if !ok {
// TODO remove after deprecation period
_, ok = os.LookupEnv(deprecatedMinerMultiAddrKey)
if ok {
log.Warnf("Using a deprecated env(%s) value, please use env(%s) instead.", deprecatedMinerMultiAddrKey, minerMultiAddrKey)
return "", xerrors.New("MINER_API_INFO environment variable required to extract IP")
minerAddr := strings.Split(env, "/")
conn, err := net.DialTimeout("tcp", minerAddr[2]+":"+minerAddr[4], timeout)
if err != nil {
return "", err
defer conn.Close() //nolint:errcheck
localAddr := conn.LocalAddr().(*net.TCPAddr)
return strings.Split(localAddr.IP.String(), ":")[0], nil