sched: re-register worker windows after re-enabling correctly

This commit is contained in:
Łukasz Magiera 2020-10-30 18:32:59 +01:00
parent f90a387f96
commit 69e44ebf07
2 changed files with 29 additions and 4 deletions

View File

@ -409,6 +409,9 @@ func TestReenableWorker(t *testing.T) {
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
i, _ := m.sched.Info(ctx)
require.Len(t, i.(SchedDiagInfo).OpenWindows, 2)
// disable // disable
atomic.StoreInt64(&w.testDisable, 1) atomic.StoreInt64(&w.testDisable, 1)
@ -421,6 +424,9 @@ func TestReenableWorker(t *testing.T) {
} }
require.False(t, m.WorkerStats()[w.session].Enabled) require.False(t, m.WorkerStats()[w.session].Enabled)
i, _ = m.sched.Info(ctx)
require.Len(t, i.(SchedDiagInfo).OpenWindows, 0)
// reenable // reenable
atomic.StoreInt64(&w.testDisable, 0) atomic.StoreInt64(&w.testDisable, 0)
@ -432,4 +438,16 @@ func TestReenableWorker(t *testing.T) {
time.Sleep(time.Millisecond * 3) time.Sleep(time.Millisecond * 3)
} }
require.True(t, m.WorkerStats()[w.session].Enabled) require.True(t, m.WorkerStats()[w.session].Enabled)
for i := 0; i < 100; i++ {
info, _ := m.sched.Info(ctx)
if len(info.(SchedDiagInfo).OpenWindows) != 0 {
break
}
time.Sleep(time.Millisecond * 3)
}
i, _ = m.sched.Info(ctx)
require.Len(t, i.(SchedDiagInfo).OpenWindows, 2)
} }

View File

@ -125,11 +125,18 @@ func (sw *schedWorker) handleWorker() {
} }
// session looks good // session looks good
{
sched.workersLk.Lock() sched.workersLk.Lock()
enabled := worker.enabled
worker.enabled = true worker.enabled = true
// we'll send window requests on the next loop
sched.workersLk.Unlock() sched.workersLk.Unlock()
if !enabled {
// go send window requests
break
}
}
// wait for more tasks to be assigned by the main scheduler or for the worker // wait for more tasks to be assigned by the main scheduler or for the worker
// to finish precessing a task // to finish precessing a task
update, ok := sw.waitForUpdates() update, ok := sw.waitForUpdates()