diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index 36c9d5eff..0bb33db14 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -451,14 +451,24 @@ var runCmd = &cli.Command{ return xerrors.Errorf("getting miner session: %w", err) } + waitQuietCh := func() chan struct{} { + out := make(chan struct{}) + go func() { + workerApi.LocalWorker.WaitQuiet() + close(out) + }() + return out + } + go func() { heartbeats := time.NewTicker(stores.HeartbeatInterval) defer heartbeats.Stop() - var connected, reconnect bool + var redeclareStorage bool + var readyCh chan struct{} for { // If we're reconnecting, redeclare storage first - if reconnect { + if redeclareStorage { log.Info("Redeclaring local storage") if err := localStore.Redeclare(ctx); err != nil { @@ -471,14 +481,13 @@ var runCmd = &cli.Command{ } continue } - - connected = false } - log.Info("Making sure no local tasks are running") - // TODO: we could get rid of this, but that requires tracking resources for restarted tasks correctly - workerApi.LocalWorker.WaitQuiet() + if readyCh == nil { + log.Info("Making sure no local tasks are running") + readyCh = waitQuietCh() + } for { curSession, err := nodeApi.Session(ctx) @@ -489,29 +498,28 @@ var runCmd = &cli.Command{ minerSession = curSession break } - - if !connected { - if err := nodeApi.WorkerConnect(ctx, "http://"+address+"/rpc/v0"); err != nil { - log.Errorf("Registering worker failed: %+v", err) - cancel() - return - } - - log.Info("Worker registered successfully, waiting for tasks") - connected = true - } } select { + case <-readyCh: + if err := nodeApi.WorkerConnect(ctx, "http://"+address+"/rpc/v0"); err != nil { + log.Errorf("Registering worker failed: %+v", err) + cancel() + return + } + + log.Info("Worker registered successfully, waiting for tasks") + + readyCh = nil + case <-heartbeats.C: case <-ctx.Done(): return // graceful shutdown - case <-heartbeats.C: } } log.Errorf("LOTUS-MINER CONNECTION LOST") - reconnect = true + redeclareStorage = true } }() diff --git a/extern/sector-storage/manager_test.go b/extern/sector-storage/manager_test.go index 0e3e7bc9d..f69d62b17 100644 --- a/extern/sector-storage/manager_test.go +++ b/extern/sector-storage/manager_test.go @@ -10,6 +10,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "testing" "time" @@ -376,3 +377,77 @@ func TestRestartWorker(t *testing.T) { require.NoError(t, err) require.Empty(t, uf) } + +func TestReenableWorker(t *testing.T) { + logging.SetAllLoggers(logging.LevelDebug) + stores.HeartbeatInterval = 5 * time.Millisecond + + ctx, done := context.WithCancel(context.Background()) + defer done() + + ds := datastore.NewMapDatastore() + + m, lstor, stor, idx, cleanup := newTestMgr(ctx, t, ds) + defer cleanup() + + localTasks := []sealtasks.TaskType{ + sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch, + } + + wds := datastore.NewMapDatastore() + + arch := make(chan chan apres) + w := newLocalWorker(func() (ffiwrapper.Storage, error) { + return &testExec{apch: arch}, nil + }, WorkerConfig{ + SealProof: 0, + TaskTypes: localTasks, + }, stor, lstor, idx, m, statestore.New(wds)) + + err := m.AddWorker(ctx, w) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 100) + + i, _ := m.sched.Info(ctx) + require.Len(t, i.(SchedDiagInfo).OpenWindows, 2) + + // disable + atomic.StoreInt64(&w.testDisable, 1) + + for i := 0; i < 100; i++ { + if !m.WorkerStats()[w.session].Enabled { + break + } + + time.Sleep(time.Millisecond * 3) + } + require.False(t, m.WorkerStats()[w.session].Enabled) + + i, _ = m.sched.Info(ctx) + require.Len(t, i.(SchedDiagInfo).OpenWindows, 0) + + // reenable + atomic.StoreInt64(&w.testDisable, 0) + + for i := 0; i < 100; i++ { + if m.WorkerStats()[w.session].Enabled { + break + } + + time.Sleep(time.Millisecond * 3) + } + require.True(t, m.WorkerStats()[w.session].Enabled) + + for i := 0; i < 100; i++ { + info, _ := m.sched.Info(ctx) + if len(info.(SchedDiagInfo).OpenWindows) != 0 { + break + } + + time.Sleep(time.Millisecond * 3) + } + + i, _ = m.sched.Info(ctx) + require.Len(t, i.(SchedDiagInfo).OpenWindows, 2) +} diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 426658c41..549a16a96 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/google/uuid" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" @@ -217,7 +218,7 @@ type SchedDiagRequestInfo struct { type SchedDiagInfo struct { Requests []SchedDiagRequestInfo - OpenWindows []WorkerID + OpenWindows []string } func (sh *scheduler) runSched() { @@ -324,7 +325,7 @@ func (sh *scheduler) diag() SchedDiagInfo { defer sh.workersLk.RUnlock() for _, window := range sh.openWindows { - out.OpenWindows = append(out.OpenWindows, window.worker) + out.OpenWindows = append(out.OpenWindows, uuid.UUID(window.worker).String()) } return out diff --git a/extern/sector-storage/sched_worker.go b/extern/sector-storage/sched_worker.go index ff43009d3..e56e9056d 100644 --- a/extern/sector-storage/sched_worker.go +++ b/extern/sector-storage/sched_worker.go @@ -104,14 +104,16 @@ func (sw *schedWorker) handleWorker() { defer sw.heartbeatTimer.Stop() for { - sched.workersLk.Lock() - enabled := worker.enabled - sched.workersLk.Unlock() + { + sched.workersLk.Lock() + enabled := worker.enabled + sched.workersLk.Unlock() - // ask for more windows if we need them (non-blocking) - if enabled { - if !sw.requestWindows() { - return // graceful shutdown + // ask for more windows if we need them (non-blocking) + if enabled { + if !sw.requestWindows() { + return // graceful shutdown + } } } @@ -123,12 +125,16 @@ func (sw *schedWorker) handleWorker() { } // session looks good - if !enabled { + { sched.workersLk.Lock() + enabled := worker.enabled worker.enabled = true sched.workersLk.Unlock() - // we'll send window requests on the next loop + if !enabled { + // go send window requests + break + } } // wait for more tasks to be assigned by the main scheduler or for the worker diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index cb1a43c53..ae2b325ca 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -8,6 +8,7 @@ import ( "reflect" "runtime" "sync" + "sync/atomic" "time" "github.com/elastic/go-sysinfo" @@ -51,8 +52,9 @@ type LocalWorker struct { acceptTasks map[sealtasks.TaskType]struct{} running sync.WaitGroup - session uuid.UUID - closing chan struct{} + session uuid.UUID + testDisable int64 + closing chan struct{} } func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker { @@ -501,6 +503,10 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) { } func (l *LocalWorker) Session(ctx context.Context) (uuid.UUID, error) { + if atomic.LoadInt64(&l.testDisable) == 1 { + return uuid.UUID{}, xerrors.Errorf("disabled") + } + select { case <-l.closing: return ClosedWorkerID, nil