worker: Don't die with the connection
This commit is contained in:
parent
b8865fb182
commit
706f4f2ef5
@ -10,7 +10,6 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@ -198,8 +197,6 @@ var runCmd = &cli.Command{
|
||||
}
|
||||
log.Infof("Remote version %s", v)
|
||||
|
||||
watchMinerConn(ctx, cctx, nodeApi)
|
||||
|
||||
// Check params
|
||||
|
||||
act, err := nodeApi.ActorAddress(ctx)
|
||||
@ -422,13 +419,36 @@ var runCmd = &cli.Command{
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Waiting for tasks")
|
||||
|
||||
go func() {
|
||||
if err := nodeApi.WorkerConnect(ctx, "ws://"+address+"/rpc/v0"); err != nil {
|
||||
log.Errorf("Registering worker failed: %+v", err)
|
||||
cancel()
|
||||
return
|
||||
for {
|
||||
log.Info("Making sure no local tasks are running")
|
||||
|
||||
// TODO: we could get rid of this, but that requires tracking resources for restarted tasks correctly
|
||||
workerApi.LocalWorker.WaitQuiet()
|
||||
|
||||
if err := nodeApi.WorkerConnect(ctx, "ws://"+address+"/rpc/v0"); err != nil {
|
||||
log.Errorf("Registering worker failed: %+v", err)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("Worker registered successfully, waiting for tasks")
|
||||
|
||||
closing, err := nodeApi.Closing(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get remote closing channel: %+v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-closing:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return // graceful shutdown
|
||||
}
|
||||
|
||||
log.Errorf("LOTUS-MINER CONNECTION LOST")
|
||||
}
|
||||
}()
|
||||
|
||||
@ -436,54 +456,6 @@ var runCmd = &cli.Command{
|
||||
},
|
||||
}
|
||||
|
||||
func watchMinerConn(ctx context.Context, cctx *cli.Context, nodeApi api.StorageMiner) {
|
||||
go func() {
|
||||
closing, err := nodeApi.Closing(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get remote closing channel: %+v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-closing:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return // graceful shutdown
|
||||
}
|
||||
|
||||
log.Warnf("Connection with miner node lost, restarting")
|
||||
|
||||
exe, err := os.Executable()
|
||||
if err != nil {
|
||||
log.Errorf("getting executable for auto-restart: %+v", err)
|
||||
}
|
||||
|
||||
_ = log.Sync()
|
||||
|
||||
// TODO: there are probably cleaner/more graceful ways to restart,
|
||||
// but this is good enough for now (FSM can recover from the mess this creates)
|
||||
//nolint:gosec
|
||||
if err := syscall.Exec(exe, []string{exe,
|
||||
fmt.Sprintf("--worker-repo=%s", cctx.String("worker-repo")),
|
||||
fmt.Sprintf("--miner-repo=%s", cctx.String("miner-repo")),
|
||||
fmt.Sprintf("--enable-gpu-proving=%t", cctx.Bool("enable-gpu-proving")),
|
||||
"run",
|
||||
fmt.Sprintf("--listen=%s", cctx.String("listen")),
|
||||
fmt.Sprintf("--no-local-storage=%t", cctx.Bool("no-local-storage")),
|
||||
fmt.Sprintf("--addpiece=%t", cctx.Bool("addpiece")),
|
||||
fmt.Sprintf("--precommit1=%t", cctx.Bool("precommit1")),
|
||||
fmt.Sprintf("--unseal=%t", cctx.Bool("unseal")),
|
||||
fmt.Sprintf("--precommit2=%t", cctx.Bool("precommit2")),
|
||||
fmt.Sprintf("--commit=%t", cctx.Bool("commit")),
|
||||
fmt.Sprintf("--parallel-fetch-limit=%d", cctx.Int("parallel-fetch-limit")),
|
||||
fmt.Sprintf("--timeout=%s", cctx.String("timeout")),
|
||||
}, os.Environ()); err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func extractRoutableIP(timeout time.Duration) (string, error) {
|
||||
minerMultiAddrKey := "MINER_API_INFO"
|
||||
deprecatedMinerMultiAddrKey := "STORAGE_API_INFO"
|
||||
|
11
extern/sector-storage/worker_local.go
vendored
11
extern/sector-storage/worker_local.go
vendored
@ -7,6 +7,7 @@ import (
|
||||
"os"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/elastic/go-sysinfo"
|
||||
"github.com/google/uuid"
|
||||
@ -42,6 +43,7 @@ type LocalWorker struct {
|
||||
|
||||
ct *workerCallTracker
|
||||
acceptTasks map[sealtasks.TaskType]struct{}
|
||||
running sync.WaitGroup
|
||||
|
||||
closing chan struct{}
|
||||
}
|
||||
@ -202,7 +204,11 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt Ret
|
||||
log.Errorf("tracking call (start): %+v", err)
|
||||
}
|
||||
|
||||
l.running.Add(1)
|
||||
|
||||
go func() {
|
||||
defer l.running.Done()
|
||||
|
||||
res, err := work(ci)
|
||||
|
||||
{
|
||||
@ -455,4 +461,9 @@ func (l *LocalWorker) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitQuiet blocks as long as there are tasks running
|
||||
func (l *LocalWorker) WaitQuiet() {
|
||||
l.running.Wait()
|
||||
}
|
||||
|
||||
var _ Worker = &LocalWorker{}
|
||||
|
Loading…
Reference in New Issue
Block a user