- Added a goroutine to listen for interrupt signals, which will cancel the current context when an interrupt signal is received. This allows for graceful shutdown of ongoing operations.
838 lines
23 KiB
838 lines
23 KiB
package main
import (
logging "github.com/ipfs/go-log/v2"
manet "github.com/multiformats/go-multiaddr/net"
lcli "github.com/filecoin-project/lotus/cli"
cliutil "github.com/filecoin-project/lotus/cli/util"
var log = logging.Logger("main")
const FlagWorkerRepo = "worker-repo"
// TODO remove after deprecation period
const FlagWorkerRepoDeprecation = "workerrepo"
func main() {
api.RunningNodeType = api.NodeWorker
local := []*cli.Command{
app := &cli.App{
Name: "lotus-worker",
Usage: "Remote miner worker",
Version: build.UserVersion(),
EnableBashCompletion: true,
Flags: []cli.Flag{
Name: FlagWorkerRepo,
Aliases: []string{FlagWorkerRepoDeprecation},
EnvVars: []string{"LOTUS_WORKER_PATH", "WORKER_PATH"},
Value: "~/.lotusworker", // TODO: Consider XDG_DATA_HOME
Usage: fmt.Sprintf("Specify worker repo path. flag %s and env WORKER_PATH are DEPRECATION, will REMOVE SOON", FlagWorkerRepoDeprecation),
Name: "panic-reports",
EnvVars: []string{"LOTUS_PANIC_REPORT_PATH"},
Hidden: true,
Value: "~/.lotusworker", // should follow --repo default
Name: "miner-repo",
Aliases: []string{"storagerepo"},
Value: "~/.lotusminer", // TODO: Consider XDG_DATA_HOME
Usage: fmt.Sprintf("Specify miner repo path. flag storagerepo and env LOTUS_STORAGE_PATH are DEPRECATION, will REMOVE SOON"),
Name: "enable-gpu-proving",
Usage: "enable use of GPU for mining operations",
Value: true,
After: func(c *cli.Context) error {
if r := recover(); r != nil {
// Generate report in LOTUS_PANIC_REPORT_PATH and re-raise panic
build.GeneratePanicReport(c.String("panic-reports"), c.String(FlagWorkerRepo), c.App.Name)
return nil
Commands: local,
app.Metadata["repoType"] = repo.Worker
if err := app.Run(os.Args); err != nil {
log.Warnf("%+v", err)
var stopCmd = &cli.Command{
Name: "stop",
Usage: "Stop a running lotus worker",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
defer closer()
ctx := lcli.ReqContext(cctx)
// Detach any storage associated with this worker
err = api.StorageDetachAll(ctx)
if err != nil {
return err
err = api.Shutdown(ctx)
if err != nil {
return err
return nil
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus worker",
Flags: []cli.Flag{
Name: "listen",
Usage: "host address and port the worker api will listen on",
Value: "",
EnvVars: []string{"LOTUS_WORKER_LISTEN"},
Name: "address",
Hidden: true,
Name: "no-local-storage",
Usage: "don't use storageminer repo for sector storage",
Name: "no-swap",
Usage: "don't use swap",
Value: false,
EnvVars: []string{"LOTUS_WORKER_NO_SWAP"},
Name: "name",
Usage: "custom worker name",
EnvVars: []string{"LOTUS_WORKER_NAME"},
DefaultText: "hostname",
Name: "addpiece",
Usage: "enable addpiece",
Value: true,
EnvVars: []string{"LOTUS_WORKER_ADDPIECE"},
Name: "precommit1",
Usage: "enable precommit1",
Value: true,
EnvVars: []string{"LOTUS_WORKER_PRECOMMIT1"},
Name: "unseal",
Usage: "enable unsealing",
Value: true,
EnvVars: []string{"LOTUS_WORKER_UNSEAL"},
Name: "precommit2",
Usage: "enable precommit2",
Value: true,
EnvVars: []string{"LOTUS_WORKER_PRECOMMIT2"},
Name: "commit",
Usage: "enable commit",
Value: true,
EnvVars: []string{"LOTUS_WORKER_COMMIT"},
Name: "replica-update",
Usage: "enable replica update",
Value: true,
Name: "prove-replica-update2",
Usage: "enable prove replica update 2",
Value: true,
Name: "regen-sector-key",
Usage: "enable regen sector key",
Value: true,
Name: "sector-download",
Usage: "enable external sector data download",
Value: false,
Name: "windowpost",
Usage: "enable window post",
Value: false,
Name: "winningpost",
Usage: "enable winning post",
Value: false,
Name: "no-default",
Usage: "disable all default compute tasks, use the worker for storage/fetching only",
Value: false,
EnvVars: []string{"LOTUS_WORKER_NO_DEFAULT"},
Name: "parallel-fetch-limit",
Usage: "maximum fetch operations to run in parallel",
Value: 5,
Name: "post-parallel-reads",
Usage: "maximum number of parallel challenge reads (0 = no limit)",
Value: 32,
Name: "post-read-timeout",
Usage: "time limit for reading PoSt challenges (0 = no limit)",
Value: 0,
Name: "timeout",
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
Value: "30m",
EnvVars: []string{"LOTUS_WORKER_TIMEOUT"},
Name: "http-server-timeout",
Value: "30s",
Name: "data-cid",
Usage: "Run the data-cid task. true|false",
Value: true,
DefaultText: "inherits --addpiece",
Before: func(cctx *cli.Context) error {
if cctx.IsSet("address") {
log.Warnf("The '--address' flag is deprecated, it has been replaced by '--listen'")
if err := cctx.Set("listen", cctx.String("address")); err != nil {
return err
return nil
Action: func(cctx *cli.Context) error {
log.Info("Starting lotus worker")
if !cctx.Bool("enable-gpu-proving") {
if err := os.Setenv("BELLMAN_NO_GPU", "true"); err != nil {
return xerrors.Errorf("could not set no-gpu env: %+v", err)
// ensure tmpdir exists
td := os.TempDir()
if err := os.MkdirAll(td, 0755); err != nil {
return xerrors.Errorf("ensuring temp dir %s exists: %w", td, err)
// Check file descriptor limit
limit, _, err := ulimit.GetLimit()
switch {
case err == ulimit.ErrUnsupported:
log.Errorw("checking file descriptor limit failed", "error", err)
case err != nil:
return xerrors.Errorf("checking fd limit: %w", err)
if limit < build.MinerFDLimit {
return xerrors.Errorf("soft file descriptor limit (ulimit -n) too low, want %d, current %d", build.MinerFDLimit, limit)
// Check DC-environment variable
sectorSizes := []string{"2KiB", "8MiB", "512MiB", "32GiB", "64GiB"}
resourcesType := reflect.TypeOf(storiface.Resources{})
for _, sectorSize := range sectorSizes {
for i := 0; i < resourcesType.NumField(); i++ {
field := resourcesType.Field(i)
envName := field.Tag.Get("envname")
if envName != "" {
// Check if DC_[SectorSize]_[ResourceRestriction] is set
envVar, ok := os.LookupEnv("DC_" + sectorSize + "_" + envName)
if ok {
// If it is set, convert it to DC_[ResourceRestriction]
err := os.Setenv("DC_"+envName, envVar)
if err != nil {
log.Fatalf("Error setting environment variable: %v", err)
log.Warnf("Converted DC_%s_%s to DC_%s, because DC is a sector-size independent job", sectorSize, envName, envName)
// Connect to storage-miner
ctx := lcli.ReqContext(cctx)
// Create a new context with cancel function
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Listen for interrupt signals
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
var nodeApi api.StorageMiner
var closer func()
for {
nodeApi, closer, err = lcli.GetStorageMinerAPI(cctx, cliutil.StorageMinerUseHttp)
if err == nil {
_, err = nodeApi.Version(ctx)
if err == nil {
fmt.Printf("\r\x1b[0KConnecting to miner API... (%s)", err)
select {
case <-ctx.Done():
return xerrors.New("Interrupted by user")
case <-time.After(time.Second):
defer closer()
// Register all metric views
if err := view.Register(
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
v, err := nodeApi.Version(ctx)
if err != nil {
return err
if v.APIVersion != api.MinerAPIVersion0 {
return xerrors.Errorf("lotus-miner API version doesn't match: expected: %s", api.APIVersion{APIVersion: api.MinerAPIVersion0})
log.Infof("Remote version %s", v)
// Check params
act, err := nodeApi.ActorAddress(ctx)
if err != nil {
return err
ssize, err := nodeApi.ActorSectorSize(ctx, act)
if err != nil {
return err
var taskTypes []sealtasks.TaskType
var workerType string
var needParams bool
if cctx.Bool("windowpost") {
needParams = true
workerType = sealtasks.WorkerWindowPoSt
taskTypes = append(taskTypes, sealtasks.TTGenerateWindowPoSt)
if cctx.Bool("winningpost") {
needParams = true
workerType = sealtasks.WorkerWinningPoSt
taskTypes = append(taskTypes, sealtasks.TTGenerateWinningPoSt)
if workerType == "" {
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFinalizeUnsealed, sealtasks.TTFinalizeReplicaUpdate)
if !cctx.Bool("no-default") {
workerType = sealtasks.WorkerSealing
ttDataCidDefault := false
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("addpiece")) && cctx.Bool("addpiece") {
taskTypes = append(taskTypes, sealtasks.TTAddPiece)
ttDataCidDefault = true
if workerType == sealtasks.WorkerSealing {
if cctx.IsSet("data-cid") {
if cctx.Bool("data-cid") {
taskTypes = append(taskTypes, sealtasks.TTDataCid)
} else if ttDataCidDefault {
taskTypes = append(taskTypes, sealtasks.TTDataCid)
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("sector-download")) && cctx.Bool("sector-download") {
taskTypes = append(taskTypes, sealtasks.TTDownloadSector)
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("precommit1")) && cctx.Bool("precommit1") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit1)
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("unseal")) && cctx.Bool("unseal") {
taskTypes = append(taskTypes, sealtasks.TTUnseal)
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("precommit2")) && cctx.Bool("precommit2") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit2)
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("commit")) && cctx.Bool("commit") {
needParams = true
taskTypes = append(taskTypes, sealtasks.TTCommit2)
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("replica-update")) && cctx.Bool("replica-update") {
taskTypes = append(taskTypes, sealtasks.TTReplicaUpdate)
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("prove-replica-update2")) && cctx.Bool("prove-replica-update2") {
needParams = true
taskTypes = append(taskTypes, sealtasks.TTProveReplicaUpdate2)
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("regen-sector-key")) && cctx.Bool("regen-sector-key") {
taskTypes = append(taskTypes, sealtasks.TTRegenSectorKey)
if cctx.Bool("no-default") && workerType == "" {
workerType = sealtasks.WorkerSealing
if len(taskTypes) == 0 {
return xerrors.Errorf("no task types specified")
for _, taskType := range taskTypes {
if taskType.WorkerType() != workerType {
return xerrors.Errorf("expected all task types to be for %s worker, but task %s is for %s worker", workerType, taskType, taskType.WorkerType())
if needParams {
if err := paramfetch.GetParams(ctx, build.ParametersJSON(), build.SrsJSON(), uint64(ssize)); err != nil {
return xerrors.Errorf("get params: %w", err)
// Open repo
repoPath := cctx.String(FlagWorkerRepo)
r, err := repo.NewFS(repoPath)
if err != nil {
return err
ok, err := r.Exists()
if err != nil {
return err
if !ok {
if err := r.Init(repo.Worker); err != nil {
return err
lr, err := r.Lock(repo.Worker)
if err != nil {
return err
var localPaths []storiface.LocalPath
if !cctx.Bool("no-local-storage") {
b, err := json.MarshalIndent(&storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: 10,
CanSeal: true,
CanStore: false,
}, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
if err := os.WriteFile(filepath.Join(lr.Path(), "sectorstore.json"), b, 0644); err != nil {
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
localPaths = append(localPaths, storiface.LocalPath{
Path: lr.Path(),
if err := lr.SetStorage(func(sc *storiface.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
}); err != nil {
return xerrors.Errorf("set storage config: %w", err)
// init datastore for r.Exists
_, err := lr.Datastore(context.Background(), "/metadata")
if err != nil {
return err
if err := lr.Close(); err != nil {
return xerrors.Errorf("close repo: %w", err)
lr, err := r.Lock(repo.Worker)
if err != nil {
return err
defer func() {
if err := lr.Close(); err != nil {
log.Error("closing repo", err)
ds, err := lr.Datastore(context.Background(), "/metadata")
if err != nil {
return err
log.Info("Opening local storage; connecting to master")
const unspecifiedAddress = ""
address := cctx.String("listen")
host, port, err := net.SplitHostPort(address)
if err != nil {
return err
if ip := net.ParseIP(host); ip != nil {
if ip.String() == unspecifiedAddress {
timeout, err := time.ParseDuration(cctx.String("timeout"))
if err != nil {
return err
rip, err := extractRoutableIP(timeout)
if err != nil {
return err
host = rip
var newAddress string
// Check if the IP address is IPv6
ip := net.ParseIP(host)
if ip.To4() == nil && ip.To16() != nil {
newAddress = "[" + host + "]:" + port
} else {
newAddress = host + ":" + port
localStore, err := paths.NewLocal(ctx, lr, nodeApi, []string{"http://" + newAddress + "/remote"})
if err != nil {
return err
// Setup remote sector store
sminfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get api info: %w", err)
remote := paths.NewRemote(localStore, nodeApi, sminfo.AuthHeader(), cctx.Int("parallel-fetch-limit"),
fh := &paths.FetchHandler{Local: localStore, PfHandler: &paths.DefaultPartialFileHandler{}}
remoteHandler := func(w http.ResponseWriter, r *http.Request) {
if !auth.HasPerm(r.Context(), nil, api.PermAdmin) {
_ = json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing admin permission"})
fh.ServeHTTP(w, r)
// Create / expose the worker
wsts := statestore.New(namespace.Wrap(ds, modules.WorkerCallsPrefix))
workerApi := &sealworker.Worker{
LocalWorker: sealer.NewLocalWorker(sealer.WorkerConfig{
TaskTypes: taskTypes,
NoSwap: cctx.Bool("no-swap"),
MaxParallelChallengeReads: cctx.Int("post-parallel-reads"),
ChallengeReadTimeout: cctx.Duration("post-read-timeout"),
Name: cctx.String("name"),
}, remote, localStore, nodeApi, nodeApi, wsts),
LocalStore: localStore,
Storage: lr,
log.Info("Setting up control endpoint at " + newAddress)
timeout, err := time.ParseDuration(cctx.String("http-server-timeout"))
if err != nil {
return xerrors.Errorf("invalid time string %s: %x", cctx.String("http-server-timeout"), err)
srv := &http.Server{
Handler: sealworker.WorkerHandler(nodeApi.AuthVerify, remoteHandler, workerApi, true),
ReadHeaderTimeout: timeout,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker"))
return ctx
go func() {
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
log.Warn("Graceful shutdown successful")
nl, err := net.Listen("tcp", newAddress)
if err != nil {
return err
a, err := net.ResolveTCPAddr("tcp", newAddress)
if err != nil {
return xerrors.Errorf("parsing address: %w", err)
ma, err := manet.FromNetAddr(a)
if err != nil {
return xerrors.Errorf("creating api multiaddress: %w", err)
if err := lr.SetAPIEndpoint(ma); err != nil {
return xerrors.Errorf("setting api endpoint: %w", err)
ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get miner API info: %w", err)
// TODO: ideally this would be a token with some permissions dropped
if err := lr.SetAPIToken(ainfo.Token); err != nil {
return xerrors.Errorf("setting api token: %w", err)
minerSession, err := nodeApi.Session(ctx)
if err != nil {
return xerrors.Errorf("getting miner session: %w", err)
waitQuietCh := func() chan struct{} {
out := make(chan struct{})
go func() {
return out
go func() {
heartbeats := time.NewTicker(paths.HeartbeatInterval)
defer heartbeats.Stop()
var redeclareStorage bool
var readyCh chan struct{}
for {
// If we're reconnecting, redeclare storage first
if redeclareStorage {
log.Info("Redeclaring local storage")
if err := localStore.Redeclare(ctx, nil, false); err != nil {
log.Errorf("Redeclaring local storage failed: %+v", err)
select {
case <-ctx.Done():
return // graceful shutdown
case <-heartbeats.C:
// TODO: we could get rid of this, but that requires tracking resources for restarted tasks correctly
if readyCh == nil {
log.Info("Making sure no local tasks are running")
readyCh = waitQuietCh()
for {
curSession, err := nodeApi.Session(ctx)
if err != nil {
log.Errorf("heartbeat: checking remote session failed: %+v", err)
} else {
if curSession != minerSession {
minerSession = curSession
select {
case <-readyCh:
if err := nodeApi.WorkerConnect(ctx, "http://"+newAddress+"/rpc/v0"); err != nil {
log.Errorf("Registering worker failed: %+v", err)
log.Info("Worker registered successfully, waiting for tasks")
readyCh = nil
case <-heartbeats.C:
case <-ctx.Done():
return // graceful shutdown
redeclareStorage = true
go func() {
// Wait 20s to allow the miner to unregister the worker on next heartbeat
time.Sleep(20 * time.Second)
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
log.Warn("Graceful shutdown successful")
return srv.Serve(nl)
func extractRoutableIP(timeout time.Duration) (string, error) {
minerMultiAddrKey := "MINER_API_INFO"
deprecatedMinerMultiAddrKey := "STORAGE_API_INFO"
env, ok := os.LookupEnv(minerMultiAddrKey)
if !ok {
_, ok = os.LookupEnv(deprecatedMinerMultiAddrKey)
if ok {
log.Warnf("Using a deprecated env(%s) value, please use env(%s) instead.", deprecatedMinerMultiAddrKey, minerMultiAddrKey)
return "", xerrors.New("MINER_API_INFO environment variable required to extract IP")
// Splitting the env to separate the JWT from the multiaddress
splitEnv := strings.SplitN(env, ":", 2)
if len(splitEnv) < 2 {
return "", xerrors.Errorf("invalid MINER_API_INFO format")
// Only take the multiaddress part
maddrStr := splitEnv[1]
maddr, err := multiaddr.NewMultiaddr(maddrStr)
if err != nil {
return "", err
minerIP, _ := maddr.ValueForProtocol(multiaddr.P_IP6)
if minerIP == "" {
minerIP, _ = maddr.ValueForProtocol(multiaddr.P_IP4)
minerPort, _ := maddr.ValueForProtocol(multiaddr.P_TCP)
// Format the address appropriately
addressToDial := net.JoinHostPort(minerIP, minerPort)
conn, err := net.DialTimeout("tcp", addressToDial, timeout)
if err != nil {
return "", err
defer func() {
if cerr := conn.Close(); cerr != nil {
log.Errorf("Error closing connection: %v", cerr)
localAddr := conn.LocalAddr().(*net.TCPAddr)
return localAddr.IP.String(), nil