package main

import (
	"context"
	"encoding/json"
	"fmt"
	"net"
	"net/http"
	"os"
	"path/filepath"
	"reflect"
	"strings"
	"time"

	"github.com/google/uuid"
	"github.com/ipfs/go-datastore/namespace"
	logging "github.com/ipfs/go-log/v2"
	"github.com/multiformats/go-multiaddr"
	manet "github.com/multiformats/go-multiaddr/net"
	"github.com/urfave/cli/v2"
	"go.opencensus.io/stats/view"
	"go.opencensus.io/tag"
	"golang.org/x/xerrors"

	"github.com/filecoin-project/go-jsonrpc/auth"
	"github.com/filecoin-project/go-paramfetch"
	"github.com/filecoin-project/go-statestore"

	"github.com/filecoin-project/lotus/api"
	"github.com/filecoin-project/lotus/build"
	lcli "github.com/filecoin-project/lotus/cli"
	cliutil "github.com/filecoin-project/lotus/cli/util"
	"github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker"
	"github.com/filecoin-project/lotus/lib/lotuslog"
	"github.com/filecoin-project/lotus/lib/ulimit"
	"github.com/filecoin-project/lotus/metrics"
	"github.com/filecoin-project/lotus/node/modules"
	"github.com/filecoin-project/lotus/node/repo"
	"github.com/filecoin-project/lotus/storage/paths"
	"github.com/filecoin-project/lotus/storage/sealer"
	"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
	"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

var log = logging.Logger("main")

const FlagWorkerRepo = "worker-repo"

// TODO remove after deprecation period
const FlagWorkerRepoDeprecation = "workerrepo"

func main() {
	api.RunningNodeType = api.NodeWorker

	lotuslog.SetupLogLevels()

	local := []*cli.Command{
		runCmd,
		stopCmd,
		infoCmd,
		storageCmd,
		setCmd,
		waitQuietCmd,
		resourcesCmd,
		tasksCmd,
	}

	app := &cli.App{
		Name:                 "lotus-worker",
		Usage:                "Remote miner worker",
		Version:              build.UserVersion(),
		EnableBashCompletion: true,
		Flags: []cli.Flag{
			&cli.StringFlag{
				Name:    FlagWorkerRepo,
				Aliases: []string{FlagWorkerRepoDeprecation},
				EnvVars: []string{"LOTUS_WORKER_PATH", "WORKER_PATH"},
				Value:   "~/.lotusworker", // TODO: Consider XDG_DATA_HOME
				Usage:   fmt.Sprintf("Specify worker repo path. flag %s and env WORKER_PATH are DEPRECATION, will REMOVE SOON", FlagWorkerRepoDeprecation),
			},
			&cli.StringFlag{
				Name:    "panic-reports",
				EnvVars: []string{"LOTUS_PANIC_REPORT_PATH"},
				Hidden:  true,
				Value:   "~/.lotusworker", // should follow --repo default
			},
			&cli.StringFlag{
				Name:    "miner-repo",
				Aliases: []string{"storagerepo"},
				EnvVars: []string{"LOTUS_MINER_PATH", "LOTUS_STORAGE_PATH"},
				Value:   "~/.lotusminer", // TODO: Consider XDG_DATA_HOME
				Usage:   fmt.Sprintf("Specify miner repo path. flag storagerepo and env LOTUS_STORAGE_PATH are DEPRECATION, will REMOVE SOON"),
			},
			&cli.BoolFlag{
				Name:    "enable-gpu-proving",
				Usage:   "enable use of GPU for mining operations",
				Value:   true,
				EnvVars: []string{"LOTUS_WORKER_ENABLE_GPU_PROVING"},
			},
		},

		After: func(c *cli.Context) error {
			if r := recover(); r != nil {
				// Generate report in LOTUS_PANIC_REPORT_PATH and re-raise panic
				build.GeneratePanicReport(c.String("panic-reports"), c.String(FlagWorkerRepo), c.App.Name)
				panic(r)
			}
			return nil
		},
		Commands: local,
	}
	app.Setup()
	app.Metadata["repoType"] = repo.Worker

	if err := app.Run(os.Args); err != nil {
		log.Warnf("%+v", err)
		return
	}
}

var stopCmd = &cli.Command{
	Name:  "stop",
	Usage: "Stop a running lotus worker",
	Flags: []cli.Flag{},
	Action: func(cctx *cli.Context) error {
		api, closer, err := lcli.GetWorkerAPI(cctx)
		if err != nil {
			return err
		}
		defer closer()

		ctx := lcli.ReqContext(cctx)

		// Detach any storage associated with this worker
		err = api.StorageDetachAll(ctx)
		if err != nil {
			return err
		}

		err = api.Shutdown(ctx)
		if err != nil {
			return err
		}

		return nil
	},
}

var runCmd = &cli.Command{
	Name:  "run",
	Usage: "Start lotus worker",
	Flags: []cli.Flag{
		&cli.StringFlag{
			Name:    "listen",
			Usage:   "host address and port the worker api will listen on",
			Value:   "0.0.0.0:3456",
			EnvVars: []string{"LOTUS_WORKER_LISTEN"},
		},
		&cli.StringFlag{
			Name:   "address",
			Hidden: true,
		},
		&cli.BoolFlag{
			Name:    "no-local-storage",
			Usage:   "don't use storageminer repo for sector storage",
			EnvVars: []string{"LOTUS_WORKER_NO_LOCAL_STORAGE"},
		},
		&cli.BoolFlag{
			Name:    "no-swap",
			Usage:   "don't use swap",
			Value:   false,
			EnvVars: []string{"LOTUS_WORKER_NO_SWAP"},
		},
		&cli.StringFlag{
			Name:        "name",
			Usage:       "custom worker name",
			EnvVars:     []string{"LOTUS_WORKER_NAME"},
			DefaultText: "hostname",
		},
		&cli.BoolFlag{
			Name:    "addpiece",
			Usage:   "enable addpiece",
			Value:   true,
			EnvVars: []string{"LOTUS_WORKER_ADDPIECE"},
		},
		&cli.BoolFlag{
			Name:    "precommit1",
			Usage:   "enable precommit1",
			Value:   true,
			EnvVars: []string{"LOTUS_WORKER_PRECOMMIT1"},
		},
		&cli.BoolFlag{
			Name:    "unseal",
			Usage:   "enable unsealing",
			Value:   true,
			EnvVars: []string{"LOTUS_WORKER_UNSEAL"},
		},
		&cli.BoolFlag{
			Name:    "precommit2",
			Usage:   "enable precommit2",
			Value:   true,
			EnvVars: []string{"LOTUS_WORKER_PRECOMMIT2"},
		},
		&cli.BoolFlag{
			Name:    "commit",
			Usage:   "enable commit",
			Value:   true,
			EnvVars: []string{"LOTUS_WORKER_COMMIT"},
		},
		&cli.BoolFlag{
			Name:    "replica-update",
			Usage:   "enable replica update",
			Value:   true,
			EnvVars: []string{"LOTUS_WORKER_REPLICA_UPDATE"},
		},
		&cli.BoolFlag{
			Name:    "prove-replica-update2",
			Usage:   "enable prove replica update 2",
			Value:   true,
			EnvVars: []string{"LOTUS_WORKER_PROVE_REPLICA_UPDATE2"},
		},
		&cli.BoolFlag{
			Name:    "regen-sector-key",
			Usage:   "enable regen sector key",
			Value:   true,
			EnvVars: []string{"LOTUS_WORKER_REGEN_SECTOR_KEY"},
		},
		&cli.BoolFlag{
			Name:    "sector-download",
			Usage:   "enable external sector data download",
			Value:   false,
			EnvVars: []string{"LOTUS_WORKER_SECTOR_DOWNLOAD"},
		},
		&cli.BoolFlag{
			Name:    "windowpost",
			Usage:   "enable window post",
			Value:   false,
			EnvVars: []string{"LOTUS_WORKER_WINDOWPOST"},
		},
		&cli.BoolFlag{
			Name:    "winningpost",
			Usage:   "enable winning post",
			Value:   false,
			EnvVars: []string{"LOTUS_WORKER_WINNINGPOST"},
		},
		&cli.BoolFlag{
			Name:    "no-default",
			Usage:   "disable all default compute tasks, use the worker for storage/fetching only",
			Value:   false,
			EnvVars: []string{"LOTUS_WORKER_NO_DEFAULT"},
		},
		&cli.IntFlag{
			Name:    "parallel-fetch-limit",
			Usage:   "maximum fetch operations to run in parallel",
			Value:   5,
			EnvVars: []string{"LOTUS_WORKER_PARALLEL_FETCH_LIMIT"},
		},
		&cli.IntFlag{
			Name:    "post-parallel-reads",
			Usage:   "maximum number of parallel challenge reads (0 = no limit)",
			Value:   32,
			EnvVars: []string{"LOTUS_WORKER_POST_PARALLEL_READS"},
		},
		&cli.DurationFlag{
			Name:    "post-read-timeout",
			Usage:   "time limit for reading PoSt challenges (0 = no limit)",
			Value:   0,
			EnvVars: []string{"LOTUS_WORKER_POST_READ_TIMEOUT"},
		},
		&cli.StringFlag{
			Name:    "timeout",
			Usage:   "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
			Value:   "30m",
			EnvVars: []string{"LOTUS_WORKER_TIMEOUT"},
		},
		&cli.StringFlag{
			Name:  "http-server-timeout",
			Value: "30s",
		},
		&cli.BoolFlag{
			Name:        "data-cid",
			Usage:       "Run the data-cid task. true|false",
			Value:       true,
			DefaultText: "inherits --addpiece",
		},
	},
	Before: func(cctx *cli.Context) error {
		if cctx.IsSet("address") {
			log.Warnf("The '--address' flag is deprecated, it has been replaced by '--listen'")
			if err := cctx.Set("listen", cctx.String("address")); err != nil {
				return err
			}
		}

		return nil
	},
	Action: func(cctx *cli.Context) error {
		log.Info("Starting lotus worker")

		if !cctx.Bool("enable-gpu-proving") {
			if err := os.Setenv("BELLMAN_NO_GPU", "true"); err != nil {
				return xerrors.Errorf("could not set no-gpu env: %+v", err)
			}
		}

		// ensure tmpdir exists
		td := os.TempDir()
		if err := os.MkdirAll(td, 0755); err != nil {
			return xerrors.Errorf("ensuring temp dir %s exists: %w", td, err)
		}

		// Check file descriptor limit
		limit, _, err := ulimit.GetLimit()
		switch {
		case err == ulimit.ErrUnsupported:
			log.Errorw("checking file descriptor limit failed", "error", err)
		case err != nil:
			return xerrors.Errorf("checking fd limit: %w", err)
		default:
			if limit < build.MinerFDLimit {
				return xerrors.Errorf("soft file descriptor limit (ulimit -n) too low, want %d, current %d", build.MinerFDLimit, limit)
			}
		}

		// Check DC-environment variable
		sectorSizes := []string{"2KiB", "8MiB", "512MiB", "32GiB", "64GiB"}
		resourcesType := reflect.TypeOf(storiface.Resources{})

		for _, sectorSize := range sectorSizes {
			for i := 0; i < resourcesType.NumField(); i++ {
				field := resourcesType.Field(i)
				envName := field.Tag.Get("envname")
				if envName != "" {
					// Check if DC_[SectorSize]_[ResourceRestriction] is set
					envVar, ok := os.LookupEnv("DC_" + sectorSize + "_" + envName)
					if ok {
						// If it is set, convert it to DC_[ResourceRestriction]
						err := os.Setenv("DC_"+envName, envVar)
						if err != nil {
							log.Fatalf("Error setting environment variable: %v", err)
						}
						log.Warnf("Converted DC_%s_%s to DC_%s, because DC is a sector-size independent job", sectorSize, envName, envName)
					}
				}
			}
		}

		// Connect to storage-miner
		ctx := lcli.ReqContext(cctx)

		var nodeApi api.StorageMiner
		var closer func()
		for {
			nodeApi, closer, err = lcli.GetStorageMinerAPI(cctx, cliutil.StorageMinerUseHttp)
			if err == nil {
				_, err = nodeApi.Version(ctx)
				if err == nil {
					break
				}
			}
			fmt.Printf("\r\x1b[0KConnecting to miner API... (%s)", err)
			time.Sleep(time.Second)
			continue
		}

		defer closer()
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

		// Register all metric views
		if err := view.Register(
			metrics.DefaultViews...,
		); err != nil {
			log.Fatalf("Cannot register the view: %v", err)
		}

		v, err := nodeApi.Version(ctx)
		if err != nil {
			return err
		}
		if v.APIVersion != api.MinerAPIVersion0 {
			return xerrors.Errorf("lotus-miner API version doesn't match: expected: %s", api.APIVersion{APIVersion: api.MinerAPIVersion0})
		}
		log.Infof("Remote version %s", v)

		// Check params

		act, err := nodeApi.ActorAddress(ctx)
		if err != nil {
			return err
		}
		ssize, err := nodeApi.ActorSectorSize(ctx, act)
		if err != nil {
			return err
		}

		var taskTypes []sealtasks.TaskType
		var workerType string
		var needParams bool

		if cctx.Bool("windowpost") {
			needParams = true
			workerType = sealtasks.WorkerWindowPoSt
			taskTypes = append(taskTypes, sealtasks.TTGenerateWindowPoSt)
		}
		if cctx.Bool("winningpost") {
			needParams = true
			workerType = sealtasks.WorkerWinningPoSt
			taskTypes = append(taskTypes, sealtasks.TTGenerateWinningPoSt)
		}

		if workerType == "" {
			taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFinalizeUnsealed, sealtasks.TTFinalizeReplicaUpdate)

			if !cctx.Bool("no-default") {
				workerType = sealtasks.WorkerSealing
			}
		}

		ttDataCidDefault := false
		if (workerType == sealtasks.WorkerSealing || cctx.IsSet("addpiece")) && cctx.Bool("addpiece") {
			taskTypes = append(taskTypes, sealtasks.TTAddPiece)
			ttDataCidDefault = true
		}
		if workerType == sealtasks.WorkerSealing {
			if cctx.IsSet("data-cid") {
				if cctx.Bool("data-cid") {
					taskTypes = append(taskTypes, sealtasks.TTDataCid)
				}
			} else if ttDataCidDefault {
				taskTypes = append(taskTypes, sealtasks.TTDataCid)
			}
		}
		if (workerType == sealtasks.WorkerSealing || cctx.IsSet("sector-download")) && cctx.Bool("sector-download") {
			taskTypes = append(taskTypes, sealtasks.TTDownloadSector)
		}
		if (workerType == sealtasks.WorkerSealing || cctx.IsSet("precommit1")) && cctx.Bool("precommit1") {
			taskTypes = append(taskTypes, sealtasks.TTPreCommit1)
		}
		if (workerType == sealtasks.WorkerSealing || cctx.IsSet("unseal")) && cctx.Bool("unseal") {
			taskTypes = append(taskTypes, sealtasks.TTUnseal)
		}
		if (workerType == sealtasks.WorkerSealing || cctx.IsSet("precommit2")) && cctx.Bool("precommit2") {
			taskTypes = append(taskTypes, sealtasks.TTPreCommit2)
		}
		if (workerType == sealtasks.WorkerSealing || cctx.IsSet("commit")) && cctx.Bool("commit") {
			needParams = true
			taskTypes = append(taskTypes, sealtasks.TTCommit2)
		}
		if (workerType == sealtasks.WorkerSealing || cctx.IsSet("replica-update")) && cctx.Bool("replica-update") {
			taskTypes = append(taskTypes, sealtasks.TTReplicaUpdate)
		}
		if (workerType == sealtasks.WorkerSealing || cctx.IsSet("prove-replica-update2")) && cctx.Bool("prove-replica-update2") {
			needParams = true
			taskTypes = append(taskTypes, sealtasks.TTProveReplicaUpdate2)
		}
		if (workerType == sealtasks.WorkerSealing || cctx.IsSet("regen-sector-key")) && cctx.Bool("regen-sector-key") {
			taskTypes = append(taskTypes, sealtasks.TTRegenSectorKey)
		}

		if cctx.Bool("no-default") && workerType == "" {
			workerType = sealtasks.WorkerSealing
		}

		if len(taskTypes) == 0 {
			return xerrors.Errorf("no task types specified")
		}
		for _, taskType := range taskTypes {
			if taskType.WorkerType() != workerType {
				return xerrors.Errorf("expected all task types to be for %s worker, but task %s is for %s worker", workerType, taskType, taskType.WorkerType())
			}
		}

		if needParams {
			if err := paramfetch.GetParams(ctx, build.ParametersJSON(), build.SrsJSON(), uint64(ssize)); err != nil {
				return xerrors.Errorf("get params: %w", err)
			}
		}

		// Open repo

		repoPath := cctx.String(FlagWorkerRepo)
		r, err := repo.NewFS(repoPath)
		if err != nil {
			return err
		}

		ok, err := r.Exists()
		if err != nil {
			return err
		}
		if !ok {
			if err := r.Init(repo.Worker); err != nil {
				return err
			}

			lr, err := r.Lock(repo.Worker)
			if err != nil {
				return err
			}

			var localPaths []storiface.LocalPath

			if !cctx.Bool("no-local-storage") {
				b, err := json.MarshalIndent(&storiface.LocalStorageMeta{
					ID:       storiface.ID(uuid.New().String()),
					Weight:   10,
					CanSeal:  true,
					CanStore: false,
				}, "", "  ")
				if err != nil {
					return xerrors.Errorf("marshaling storage config: %w", err)
				}

				if err := os.WriteFile(filepath.Join(lr.Path(), "sectorstore.json"), b, 0644); err != nil {
					return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
				}

				localPaths = append(localPaths, storiface.LocalPath{
					Path: lr.Path(),
				})
			}

			if err := lr.SetStorage(func(sc *storiface.StorageConfig) {
				sc.StoragePaths = append(sc.StoragePaths, localPaths...)
			}); err != nil {
				return xerrors.Errorf("set storage config: %w", err)
			}

			{
				// init datastore for r.Exists
				_, err := lr.Datastore(context.Background(), "/metadata")
				if err != nil {
					return err
				}
			}
			if err := lr.Close(); err != nil {
				return xerrors.Errorf("close repo: %w", err)
			}
		}

		lr, err := r.Lock(repo.Worker)
		if err != nil {
			return err
		}
		defer func() {
			if err := lr.Close(); err != nil {
				log.Error("closing repo", err)
			}
		}()
		ds, err := lr.Datastore(context.Background(), "/metadata")
		if err != nil {
			return err
		}

		log.Info("Opening local storage; connecting to master")
		const unspecifiedAddress = "0.0.0.0"

		address := cctx.String("listen")
		host, port, err := net.SplitHostPort(address)
		if err != nil {
			return err
		}

		if ip := net.ParseIP(host); ip != nil {
			if ip.String() == unspecifiedAddress {
				timeout, err := time.ParseDuration(cctx.String("timeout"))
				if err != nil {
					return err
				}
				rip, err := extractRoutableIP(timeout)
				if err != nil {
					return err
				}
				host = rip
			}
		}

		var newAddress string

		// Check if the IP address is IPv6
		ip := net.ParseIP(host)
		if ip.To4() == nil && ip.To16() != nil {
			newAddress = "[" + host + "]:" + port
		} else {
			newAddress = host + ":" + port
		}

		localStore, err := paths.NewLocal(ctx, lr, nodeApi, []string{"http://" + newAddress + "/remote"})
		if err != nil {
			return err
		}

		// Setup remote sector store
		sminfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
		if err != nil {
			return xerrors.Errorf("could not get api info: %w", err)
		}

		remote := paths.NewRemote(localStore, nodeApi, sminfo.AuthHeader(), cctx.Int("parallel-fetch-limit"),
			&paths.DefaultPartialFileHandler{})

		fh := &paths.FetchHandler{Local: localStore, PfHandler: &paths.DefaultPartialFileHandler{}}
		remoteHandler := func(w http.ResponseWriter, r *http.Request) {
			if !auth.HasPerm(r.Context(), nil, api.PermAdmin) {
				w.WriteHeader(401)
				_ = json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing admin permission"})
				return
			}

			fh.ServeHTTP(w, r)
		}

		// Create / expose the worker

		wsts := statestore.New(namespace.Wrap(ds, modules.WorkerCallsPrefix))

		workerApi := &sealworker.Worker{
			LocalWorker: sealer.NewLocalWorker(sealer.WorkerConfig{
				TaskTypes:                 taskTypes,
				NoSwap:                    cctx.Bool("no-swap"),
				MaxParallelChallengeReads: cctx.Int("post-parallel-reads"),
				ChallengeReadTimeout:      cctx.Duration("post-read-timeout"),
				Name:                      cctx.String("name"),
			}, remote, localStore, nodeApi, nodeApi, wsts),
			LocalStore: localStore,
			Storage:    lr,
		}

		log.Info("Setting up control endpoint at " + address)

		timeout, err := time.ParseDuration(cctx.String("http-server-timeout"))
		if err != nil {
			return xerrors.Errorf("invalid time string %s: %x", cctx.String("http-server-timeout"), err)
		}

		srv := &http.Server{
			Handler:           sealworker.WorkerHandler(nodeApi.AuthVerify, remoteHandler, workerApi, true),
			ReadHeaderTimeout: timeout,
			BaseContext: func(listener net.Listener) context.Context {
				ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker"))
				return ctx
			},
		}

		go func() {
			<-ctx.Done()
			log.Warn("Shutting down...")
			if err := srv.Shutdown(context.TODO()); err != nil {
				log.Errorf("shutting down RPC server failed: %s", err)
			}
			log.Warn("Graceful shutdown successful")
		}()

		nl, err := net.Listen("tcp", address)
		if err != nil {
			return err
		}

		{
			a, err := net.ResolveTCPAddr("tcp", address)
			if err != nil {
				return xerrors.Errorf("parsing address: %w", err)
			}

			ma, err := manet.FromNetAddr(a)
			if err != nil {
				return xerrors.Errorf("creating api multiaddress: %w", err)
			}

			if err := lr.SetAPIEndpoint(ma); err != nil {
				return xerrors.Errorf("setting api endpoint: %w", err)
			}

			ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
			if err != nil {
				return xerrors.Errorf("could not get miner API info: %w", err)
			}

			// TODO: ideally this would be a token with some permissions dropped
			if err := lr.SetAPIToken(ainfo.Token); err != nil {
				return xerrors.Errorf("setting api token: %w", err)
			}
		}

		minerSession, err := nodeApi.Session(ctx)
		if err != nil {
			return xerrors.Errorf("getting miner session: %w", err)
		}

		waitQuietCh := func() chan struct{} {
			out := make(chan struct{})
			go func() {
				workerApi.LocalWorker.WaitQuiet()
				close(out)
			}()
			return out
		}

		go func() {
			heartbeats := time.NewTicker(paths.HeartbeatInterval)
			defer heartbeats.Stop()

			var redeclareStorage bool
			var readyCh chan struct{}
			for {
				// If we're reconnecting, redeclare storage first
				if redeclareStorage {
					log.Info("Redeclaring local storage")

					if err := localStore.Redeclare(ctx, nil, false); err != nil {
						log.Errorf("Redeclaring local storage failed: %+v", err)

						select {
						case <-ctx.Done():
							return // graceful shutdown
						case <-heartbeats.C:
						}
						continue
					}
				}

				// TODO: we could get rid of this, but that requires tracking resources for restarted tasks correctly
				if readyCh == nil {
					log.Info("Making sure no local tasks are running")
					readyCh = waitQuietCh()
				}

				for {
					curSession, err := nodeApi.Session(ctx)
					if err != nil {
						log.Errorf("heartbeat: checking remote session failed: %+v", err)
					} else {
						if curSession != minerSession {
							minerSession = curSession
							break
						}
					}

					select {
					case <-readyCh:
						if err := nodeApi.WorkerConnect(ctx, "http://"+address+"/rpc/v0"); err != nil {
							log.Errorf("Registering worker failed: %+v", err)
							cancel()
							return
						}

						log.Info("Worker registered successfully, waiting for tasks")

						readyCh = nil
					case <-heartbeats.C:
					case <-ctx.Done():
						return // graceful shutdown
					}
				}

				log.Errorf("LOTUS-MINER CONNECTION LOST")

				redeclareStorage = true
			}
		}()

		go func() {
			<-workerApi.Done()
			// Wait 20s to allow the miner to unregister the worker on next heartbeat
			time.Sleep(20 * time.Second)
			log.Warn("Shutting down...")
			if err := srv.Shutdown(context.TODO()); err != nil {
				log.Errorf("shutting down RPC server failed: %s", err)
			}
			log.Warn("Graceful shutdown successful")
		}()

		return srv.Serve(nl)
	},
}

func extractRoutableIP(timeout time.Duration) (string, error) {
	minerMultiAddrKey := "MINER_API_INFO"
	deprecatedMinerMultiAddrKey := "STORAGE_API_INFO"
	env, ok := os.LookupEnv(minerMultiAddrKey)
	if !ok {
		_, ok = os.LookupEnv(deprecatedMinerMultiAddrKey)
		if ok {
			log.Warnf("Using a deprecated env(%s) value, please use env(%s) instead.", deprecatedMinerMultiAddrKey, minerMultiAddrKey)
		}
		return "", xerrors.New("MINER_API_INFO environment variable required to extract IP")
	}

	// Splitting the env to separate the JWT from the multiaddress
	splitEnv := strings.SplitN(env, ":", 2)
	if len(splitEnv) < 2 {
		return "", xerrors.Errorf("invalid MINER_API_INFO format")
	}
	// Only take the multiaddress part
	maddrStr := splitEnv[1]

	maddr, err := multiaddr.NewMultiaddr(maddrStr)
	if err != nil {
		return "", err
	}

	minerIP, _ := maddr.ValueForProtocol(multiaddr.P_IP6)
	minerPort, _ := maddr.ValueForProtocol(multiaddr.P_TCP)

	// Check if the IP is IPv6 and format the address appropriately
	var addressToDial string
	if ip := net.ParseIP(minerIP); ip.To4() == nil && ip.To16() != nil {
		addressToDial = "[" + minerIP + "]:" + minerPort
	} else {
		addressToDial = minerIP + ":" + minerPort
	}

	conn, err := net.DialTimeout("tcp", addressToDial, timeout)
	if err != nil {
		return "", err
	}

	defer func() {
		if cerr := conn.Close(); cerr != nil {
			log.Errorf("Error closing connection: %v", cerr)
		}
	}()

	localAddr := conn.LocalAddr().(*net.TCPAddr)
	return localAddr.IP.String(), nil
}