From e06cfa2d0a4f00460434dca458d1c90c065d5edc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 12 Jan 2024 16:30:22 +0100 Subject: [PATCH] harmony: Don't leak tickers, faster multiple bulk scheduling --- lib/harmony/harmonytask/harmonytask.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go index c56b83b95..01f2febba 100644 --- a/lib/harmony/harmonytask/harmonytask.go +++ b/lib/harmony/harmonytask/harmonytask.go @@ -12,9 +12,10 @@ import ( ) // Consts (except for unit test) -var POLL_DURATION = time.Second * 3 // Poll for Work this frequently -var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone -var FOLLOW_FREQUENCY = 1 * time.Minute // Check for work to follow this often +var POLL_DURATION = time.Second * 3 // Poll for Work this frequently +var POLL_NEXT_DURATION = 100 * time.Millisecond // After scheduling a task, wait this long before scheduling another +var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone +var FOLLOW_FREQUENCY = 1 * time.Minute // Check for work to follow this often type TaskTypeDetails struct { // Max returns how many tasks this machine can run of this type. @@ -211,13 +212,19 @@ top: } func (e *TaskEngine) poller() { + nextWait := POLL_NEXT_DURATION for { select { - case <-time.NewTicker(POLL_DURATION).C: // Find work periodically + case <-time.After(nextWait): // Find work periodically case <-e.ctx.Done(): ///////////////////// Graceful exit return } - e.pollerTryAllWork() + nextWait = POLL_DURATION + + accepted := e.pollerTryAllWork() + if accepted { + nextWait = POLL_NEXT_DURATION + } if time.Since(e.lastFollowTime) > FOLLOW_FREQUENCY { e.followWorkInDB() } @@ -266,7 +273,7 @@ func (e *TaskEngine) followWorkInDB() { } // pollerTryAllWork starts the next 1 task -func (e *TaskEngine) pollerTryAllWork() { +func (e *TaskEngine) pollerTryAllWork() bool { if time.Since(e.lastCleanup.Load().(time.Time)) > CLEANUP_FREQUENCY { e.lastCleanup.Store(time.Now()) resources.CleanupMachines(e.ctx, e.db) @@ -287,11 +294,13 @@ func (e *TaskEngine) pollerTryAllWork() { if len(unownedTasks) > 0 { accepted := v.considerWork(workSourcePoller, unownedTasks) if accepted { - return // accept new work slowly and in priority order + return true // accept new work slowly and in priority order } log.Warn("Work not accepted for " + strconv.Itoa(len(unownedTasks)) + " " + v.Name + " task(s)") } } + + return false } // ResourcesAvailable determines what resources are still unassigned.