diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index 9739acb68..292590c2e 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -10,7 +10,6 @@ import ( "os" "path/filepath" "strings" - "syscall" "time" "github.com/google/uuid" @@ -198,8 +197,6 @@ var runCmd = &cli.Command{ } log.Infof("Remote version %s", v) - watchMinerConn(ctx, cctx, nodeApi) - // Check params act, err := nodeApi.ActorAddress(ctx) @@ -422,13 +419,36 @@ var runCmd = &cli.Command{ } } - log.Info("Waiting for tasks") - go func() { - if err := nodeApi.WorkerConnect(ctx, "ws://"+address+"/rpc/v0"); err != nil { - log.Errorf("Registering worker failed: %+v", err) - cancel() - return + for { + log.Info("Making sure no local tasks are running") + + // TODO: we could get rid of this, but that requires tracking resources for restarted tasks correctly + workerApi.LocalWorker.WaitQuiet() + + if err := nodeApi.WorkerConnect(ctx, "ws://"+address+"/rpc/v0"); err != nil { + log.Errorf("Registering worker failed: %+v", err) + cancel() + return + } + + log.Info("Worker registered successfully, waiting for tasks") + + closing, err := nodeApi.Closing(ctx) + if err != nil { + log.Errorf("failed to get remote closing channel: %+v", err) + } + + select { + case <-closing: + case <-ctx.Done(): + } + + if ctx.Err() != nil { + return // graceful shutdown + } + + log.Errorf("LOTUS-MINER CONNECTION LOST") } }() @@ -436,54 +456,6 @@ var runCmd = &cli.Command{ }, } -func watchMinerConn(ctx context.Context, cctx *cli.Context, nodeApi api.StorageMiner) { - go func() { - closing, err := nodeApi.Closing(ctx) - if err != nil { - log.Errorf("failed to get remote closing channel: %+v", err) - } - - select { - case <-closing: - case <-ctx.Done(): - } - - if ctx.Err() != nil { - return // graceful shutdown - } - - log.Warnf("Connection with miner node lost, restarting") - - exe, err := os.Executable() - if err != nil { - log.Errorf("getting executable for auto-restart: %+v", err) - } - - _ = log.Sync() - - // TODO: there are probably cleaner/more graceful ways to restart, - // but this is good enough for now (FSM can recover from the mess this creates) - //nolint:gosec - if err := syscall.Exec(exe, []string{exe, - fmt.Sprintf("--worker-repo=%s", cctx.String("worker-repo")), - fmt.Sprintf("--miner-repo=%s", cctx.String("miner-repo")), - fmt.Sprintf("--enable-gpu-proving=%t", cctx.Bool("enable-gpu-proving")), - "run", - fmt.Sprintf("--listen=%s", cctx.String("listen")), - fmt.Sprintf("--no-local-storage=%t", cctx.Bool("no-local-storage")), - fmt.Sprintf("--addpiece=%t", cctx.Bool("addpiece")), - fmt.Sprintf("--precommit1=%t", cctx.Bool("precommit1")), - fmt.Sprintf("--unseal=%t", cctx.Bool("unseal")), - fmt.Sprintf("--precommit2=%t", cctx.Bool("precommit2")), - fmt.Sprintf("--commit=%t", cctx.Bool("commit")), - fmt.Sprintf("--parallel-fetch-limit=%d", cctx.Int("parallel-fetch-limit")), - fmt.Sprintf("--timeout=%s", cctx.String("timeout")), - }, os.Environ()); err != nil { - fmt.Println(err) - } - }() -} - func extractRoutableIP(timeout time.Duration) (string, error) { minerMultiAddrKey := "MINER_API_INFO" deprecatedMinerMultiAddrKey := "STORAGE_API_INFO" diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 38b41ceb4..46f0d65e2 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -7,6 +7,7 @@ import ( "os" "reflect" "runtime" + "sync" "github.com/elastic/go-sysinfo" "github.com/google/uuid" @@ -42,6 +43,7 @@ type LocalWorker struct { ct *workerCallTracker acceptTasks map[sealtasks.TaskType]struct{} + running sync.WaitGroup closing chan struct{} } @@ -202,7 +204,11 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt Ret log.Errorf("tracking call (start): %+v", err) } + l.running.Add(1) + go func() { + defer l.running.Done() + res, err := work(ci) { @@ -455,4 +461,9 @@ func (l *LocalWorker) Close() error { return nil } +// WaitQuiet blocks as long as there are tasks running +func (l *LocalWorker) WaitQuiet() { + l.running.Wait() +} + var _ Worker = &LocalWorker{}