diff --git a/manager.go b/manager.go index 0cd081d92..fc3be18c1 100644 --- a/manager.go +++ b/manager.go @@ -489,8 +489,8 @@ func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, erro return m.storage.FsStat(ctx, id) } -func (m *Manager) Close() error { - return m.sched.Close() +func (m *Manager) Close(ctx context.Context) error { + return m.sched.Close(ctx) } var _ SectorManager = &Manager{} diff --git a/sched.go b/sched.go index caf67c678..bec5ee0c5 100644 --- a/sched.go +++ b/sched.go @@ -3,6 +3,7 @@ package sectorstorage import ( "container/heap" "context" + "fmt" "math/rand" "sort" "sync" @@ -69,6 +70,7 @@ type scheduler struct { openWindows []*schedWindowRequest closing chan struct{} + closed chan struct{} testSync chan struct{} // used for testing } @@ -79,6 +81,11 @@ type workerHandle struct { preparing *activeResources active *activeResources + + // for sync manager goroutine closing + cleanupStarted bool + closedMgr chan struct{} + closingMgr chan struct{} } type schedWindowRequest struct { @@ -138,6 +145,7 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler { schedQueue: &requestQueue{}, closing: make(chan struct{}), + closed: make(chan struct{}), } } @@ -182,6 +190,8 @@ func (r *workerRequest) respond(err error) { } func (sh *scheduler) runSched() { + defer close(sh.closed) + go sh.runWorkerWatcher() for { @@ -366,11 +376,23 @@ func (sh *scheduler) trySched() { } func (sh *scheduler) runWorker(wid WorkerID) { + var ready sync.WaitGroup + ready.Add(1) + defer ready.Wait() + go func() { sh.workersLk.Lock() - worker := sh.workers[wid] + worker, found := sh.workers[wid] sh.workersLk.Unlock() + ready.Done() + + if !found { + panic(fmt.Sprintf("worker %d not found", wid)) + } + + defer close(worker.closedMgr) + scheduledWindows := make(chan *schedWindow, SchedWindows) taskDone := make(chan struct{}, 1) windowsRequested := 0 @@ -403,6 +425,8 @@ func (sh *scheduler) runWorker(wid WorkerID) { return case <-workerClosing: return + case <-worker.closingMgr: + return } } @@ -415,6 +439,8 @@ func (sh *scheduler) runWorker(wid WorkerID) { return case <-workerClosing: return + case <-worker.closingMgr: + return } assignLoop: @@ -518,6 +544,9 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke } func (sh *scheduler) newWorker(w *workerHandle) { + w.closedMgr = make(chan struct{}) + w.closingMgr = make(chan struct{}) + sh.workersLk.Lock() id := sh.nextWorker @@ -526,13 +555,13 @@ func (sh *scheduler) newWorker(w *workerHandle) { sh.workersLk.Unlock() + sh.runWorker(id) + select { case sh.watchClosing <- id: case <-sh.closing: return } - - sh.runWorker(id) } func (sh *scheduler) dropWorker(wid WorkerID) { @@ -540,37 +569,59 @@ func (sh *scheduler) dropWorker(wid WorkerID) { defer sh.workersLk.Unlock() w := sh.workers[wid] + + sh.workerCleanup(wid, w) + delete(sh.workers, wid) +} - newWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)) - for _, window := range sh.openWindows { - if window.worker != wid { - newWindows = append(newWindows, window) - } +func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) { + if !w.cleanupStarted { + close(w.closingMgr) + } + select { + case <-w.closedMgr: + case <-time.After(time.Second): + log.Errorf("timeout closing worker manager goroutine %d", wid) } - sh.openWindows = newWindows - // TODO: sync close worker goroutine + if !w.cleanupStarted { + w.cleanupStarted = true - go func() { - if err := w.w.Close(); err != nil { - log.Warnf("closing worker %d: %+v", err) + newWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)) + for _, window := range sh.openWindows { + if window.worker != wid { + newWindows = append(newWindows, window) + } } - }() + sh.openWindows = newWindows + + log.Debugf("dropWorker %d", wid) + + go func() { + if err := w.w.Close(); err != nil { + log.Warnf("closing worker %d: %+v", err) + } + }() + } } func (sh *scheduler) schedClose() { sh.workersLk.Lock() defer sh.workersLk.Unlock() + log.Debugf("closing scheduler") for i, w := range sh.workers { - if err := w.w.Close(); err != nil { - log.Errorf("closing worker %d: %+v", i, err) - } + sh.workerCleanup(i, w) } } -func (sh *scheduler) Close() error { +func (sh *scheduler) Close(ctx context.Context) error { close(sh.closing) + select { + case <-sh.closed: + case <-ctx.Done(): + return ctx.Err() + } return nil } diff --git a/sched_test.go b/sched_test.go index e6bd8d220..67a5eeed3 100644 --- a/sched_test.go +++ b/sched_test.go @@ -119,6 +119,7 @@ func (s *schedTestWorker) Closing(ctx context.Context) (<-chan struct{}, error) func (s *schedTestWorker) Close() error { if !s.closed { + log.Info("close schedTestWorker") s.closed = true close(s.closing) } @@ -169,11 +170,11 @@ func TestSchedStartStop(t *testing.T) { addTestWorker(t, sched, stores.NewIndex(), "fred", nil) - sched.schedClose() + require.NoError(t, sched.Close(context.TODO())) } func TestSched(t *testing.T) { - ctx, done := context.WithTimeout(context.Background(), 20*time.Second) + ctx, done := context.WithTimeout(context.Background(), 30*time.Second) defer done() spt := abi.RegisteredSealProof_StackedDrg32GiBV1 @@ -301,7 +302,7 @@ func TestSched(t *testing.T) { log.Info("wait for async stuff") rm.wg.Wait() - sched.schedClose() + require.NoError(t, sched.Close(context.TODO())) } } diff --git a/sched_watch.go b/sched_watch.go index 214489083..d93cf1af3 100644 --- a/sched_watch.go +++ b/sched_watch.go @@ -74,7 +74,11 @@ func (sh *scheduler) runWorkerWatcher() { caseToWorker[toSet] = wid default: - wid := caseToWorker[n] + wid, found := caseToWorker[n] + if !found { + log.Errorf("worker ID not found for case %d", n) + continue + } delete(caseToWorker, n) cases[n] = reflect.SelectCase{