fix worker setup/cleanup raciness

This commit is contained in:
Łukasz Magiera 2020-07-17 12:59:12 +02:00
parent 3c9fa8609a
commit f1b3837186
4 changed files with 80 additions and 24 deletions

View File

@ -489,8 +489,8 @@ func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, erro
return m.storage.FsStat(ctx, id) return m.storage.FsStat(ctx, id)
} }
func (m *Manager) Close() error { func (m *Manager) Close(ctx context.Context) error {
return m.sched.Close() return m.sched.Close(ctx)
} }
var _ SectorManager = &Manager{} var _ SectorManager = &Manager{}

View File

@ -3,6 +3,7 @@ package sectorstorage
import ( import (
"container/heap" "container/heap"
"context" "context"
"fmt"
"math/rand" "math/rand"
"sort" "sort"
"sync" "sync"
@ -69,6 +70,7 @@ type scheduler struct {
openWindows []*schedWindowRequest openWindows []*schedWindowRequest
closing chan struct{} closing chan struct{}
closed chan struct{}
testSync chan struct{} // used for testing testSync chan struct{} // used for testing
} }
@ -79,6 +81,11 @@ type workerHandle struct {
preparing *activeResources preparing *activeResources
active *activeResources active *activeResources
// for sync manager goroutine closing
cleanupStarted bool
closedMgr chan struct{}
closingMgr chan struct{}
} }
type schedWindowRequest struct { type schedWindowRequest struct {
@ -138,6 +145,7 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler {
schedQueue: &requestQueue{}, schedQueue: &requestQueue{},
closing: make(chan struct{}), closing: make(chan struct{}),
closed: make(chan struct{}),
} }
} }
@ -182,6 +190,8 @@ func (r *workerRequest) respond(err error) {
} }
func (sh *scheduler) runSched() { func (sh *scheduler) runSched() {
defer close(sh.closed)
go sh.runWorkerWatcher() go sh.runWorkerWatcher()
for { for {
@ -366,11 +376,23 @@ func (sh *scheduler) trySched() {
} }
func (sh *scheduler) runWorker(wid WorkerID) { func (sh *scheduler) runWorker(wid WorkerID) {
var ready sync.WaitGroup
ready.Add(1)
defer ready.Wait()
go func() { go func() {
sh.workersLk.Lock() sh.workersLk.Lock()
worker := sh.workers[wid] worker, found := sh.workers[wid]
sh.workersLk.Unlock() sh.workersLk.Unlock()
ready.Done()
if !found {
panic(fmt.Sprintf("worker %d not found", wid))
}
defer close(worker.closedMgr)
scheduledWindows := make(chan *schedWindow, SchedWindows) scheduledWindows := make(chan *schedWindow, SchedWindows)
taskDone := make(chan struct{}, 1) taskDone := make(chan struct{}, 1)
windowsRequested := 0 windowsRequested := 0
@ -403,6 +425,8 @@ func (sh *scheduler) runWorker(wid WorkerID) {
return return
case <-workerClosing: case <-workerClosing:
return return
case <-worker.closingMgr:
return
} }
} }
@ -415,6 +439,8 @@ func (sh *scheduler) runWorker(wid WorkerID) {
return return
case <-workerClosing: case <-workerClosing:
return return
case <-worker.closingMgr:
return
} }
assignLoop: assignLoop:
@ -518,6 +544,9 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
} }
func (sh *scheduler) newWorker(w *workerHandle) { func (sh *scheduler) newWorker(w *workerHandle) {
w.closedMgr = make(chan struct{})
w.closingMgr = make(chan struct{})
sh.workersLk.Lock() sh.workersLk.Lock()
id := sh.nextWorker id := sh.nextWorker
@ -526,13 +555,13 @@ func (sh *scheduler) newWorker(w *workerHandle) {
sh.workersLk.Unlock() sh.workersLk.Unlock()
sh.runWorker(id)
select { select {
case sh.watchClosing <- id: case sh.watchClosing <- id:
case <-sh.closing: case <-sh.closing:
return return
} }
sh.runWorker(id)
} }
func (sh *scheduler) dropWorker(wid WorkerID) { func (sh *scheduler) dropWorker(wid WorkerID) {
@ -540,37 +569,59 @@ func (sh *scheduler) dropWorker(wid WorkerID) {
defer sh.workersLk.Unlock() defer sh.workersLk.Unlock()
w := sh.workers[wid] w := sh.workers[wid]
sh.workerCleanup(wid, w)
delete(sh.workers, wid) delete(sh.workers, wid)
}
newWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)) func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) {
for _, window := range sh.openWindows { if !w.cleanupStarted {
if window.worker != wid { close(w.closingMgr)
newWindows = append(newWindows, window) }
} select {
case <-w.closedMgr:
case <-time.After(time.Second):
log.Errorf("timeout closing worker manager goroutine %d", wid)
} }
sh.openWindows = newWindows
// TODO: sync close worker goroutine if !w.cleanupStarted {
w.cleanupStarted = true
go func() { newWindows := make([]*schedWindowRequest, 0, len(sh.openWindows))
if err := w.w.Close(); err != nil { for _, window := range sh.openWindows {
log.Warnf("closing worker %d: %+v", err) if window.worker != wid {
newWindows = append(newWindows, window)
}
} }
}() sh.openWindows = newWindows
log.Debugf("dropWorker %d", wid)
go func() {
if err := w.w.Close(); err != nil {
log.Warnf("closing worker %d: %+v", err)
}
}()
}
} }
func (sh *scheduler) schedClose() { func (sh *scheduler) schedClose() {
sh.workersLk.Lock() sh.workersLk.Lock()
defer sh.workersLk.Unlock() defer sh.workersLk.Unlock()
log.Debugf("closing scheduler")
for i, w := range sh.workers { for i, w := range sh.workers {
if err := w.w.Close(); err != nil { sh.workerCleanup(i, w)
log.Errorf("closing worker %d: %+v", i, err)
}
} }
} }
func (sh *scheduler) Close() error { func (sh *scheduler) Close(ctx context.Context) error {
close(sh.closing) close(sh.closing)
select {
case <-sh.closed:
case <-ctx.Done():
return ctx.Err()
}
return nil return nil
} }

View File

@ -119,6 +119,7 @@ func (s *schedTestWorker) Closing(ctx context.Context) (<-chan struct{}, error)
func (s *schedTestWorker) Close() error { func (s *schedTestWorker) Close() error {
if !s.closed { if !s.closed {
log.Info("close schedTestWorker")
s.closed = true s.closed = true
close(s.closing) close(s.closing)
} }
@ -169,11 +170,11 @@ func TestSchedStartStop(t *testing.T) {
addTestWorker(t, sched, stores.NewIndex(), "fred", nil) addTestWorker(t, sched, stores.NewIndex(), "fred", nil)
sched.schedClose() require.NoError(t, sched.Close(context.TODO()))
} }
func TestSched(t *testing.T) { func TestSched(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 20*time.Second) ctx, done := context.WithTimeout(context.Background(), 30*time.Second)
defer done() defer done()
spt := abi.RegisteredSealProof_StackedDrg32GiBV1 spt := abi.RegisteredSealProof_StackedDrg32GiBV1
@ -301,7 +302,7 @@ func TestSched(t *testing.T) {
log.Info("wait for async stuff") log.Info("wait for async stuff")
rm.wg.Wait() rm.wg.Wait()
sched.schedClose() require.NoError(t, sched.Close(context.TODO()))
} }
} }

View File

@ -74,7 +74,11 @@ func (sh *scheduler) runWorkerWatcher() {
caseToWorker[toSet] = wid caseToWorker[toSet] = wid
default: default:
wid := caseToWorker[n] wid, found := caseToWorker[n]
if !found {
log.Errorf("worker ID not found for case %d", n)
continue
}
delete(caseToWorker, n) delete(caseToWorker, n)
cases[n] = reflect.SelectCase{ cases[n] = reflect.SelectCase{