harmony: Don't leak tickers, faster multiple bulk scheduling

This commit is contained in:
Łukasz Magiera 2024-01-12 16:30:22 +01:00
parent 0bd4d7dd8b
commit e06cfa2d0a

View File

@ -12,9 +12,10 @@ import (
) )
// Consts (except for unit test) // Consts (except for unit test)
var POLL_DURATION = time.Second * 3 // Poll for Work this frequently var POLL_DURATION = time.Second * 3 // Poll for Work this frequently
var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone var POLL_NEXT_DURATION = 100 * time.Millisecond // After scheduling a task, wait this long before scheduling another
var FOLLOW_FREQUENCY = 1 * time.Minute // Check for work to follow this often var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone
var FOLLOW_FREQUENCY = 1 * time.Minute // Check for work to follow this often
type TaskTypeDetails struct { type TaskTypeDetails struct {
// Max returns how many tasks this machine can run of this type. // Max returns how many tasks this machine can run of this type.
@ -211,13 +212,19 @@ top:
} }
func (e *TaskEngine) poller() { func (e *TaskEngine) poller() {
nextWait := POLL_NEXT_DURATION
for { for {
select { select {
case <-time.NewTicker(POLL_DURATION).C: // Find work periodically case <-time.After(nextWait): // Find work periodically
case <-e.ctx.Done(): ///////////////////// Graceful exit case <-e.ctx.Done(): ///////////////////// Graceful exit
return return
} }
e.pollerTryAllWork() nextWait = POLL_DURATION
accepted := e.pollerTryAllWork()
if accepted {
nextWait = POLL_NEXT_DURATION
}
if time.Since(e.lastFollowTime) > FOLLOW_FREQUENCY { if time.Since(e.lastFollowTime) > FOLLOW_FREQUENCY {
e.followWorkInDB() e.followWorkInDB()
} }
@ -266,7 +273,7 @@ func (e *TaskEngine) followWorkInDB() {
} }
// pollerTryAllWork starts the next 1 task // pollerTryAllWork starts the next 1 task
func (e *TaskEngine) pollerTryAllWork() { func (e *TaskEngine) pollerTryAllWork() bool {
if time.Since(e.lastCleanup.Load().(time.Time)) > CLEANUP_FREQUENCY { if time.Since(e.lastCleanup.Load().(time.Time)) > CLEANUP_FREQUENCY {
e.lastCleanup.Store(time.Now()) e.lastCleanup.Store(time.Now())
resources.CleanupMachines(e.ctx, e.db) resources.CleanupMachines(e.ctx, e.db)
@ -287,11 +294,13 @@ func (e *TaskEngine) pollerTryAllWork() {
if len(unownedTasks) > 0 { if len(unownedTasks) > 0 {
accepted := v.considerWork(workSourcePoller, unownedTasks) accepted := v.considerWork(workSourcePoller, unownedTasks)
if accepted { if accepted {
return // accept new work slowly and in priority order return true // accept new work slowly and in priority order
} }
log.Warn("Work not accepted for " + strconv.Itoa(len(unownedTasks)) + " " + v.Name + " task(s)") log.Warn("Work not accepted for " + strconv.Itoa(len(unownedTasks)) + " " + v.Name + " task(s)")
} }
} }
return false
} }
// ResourcesAvailable determines what resources are still unassigned. // ResourcesAvailable determines what resources are still unassigned.