From cea46d8c8cf724663903494c80eadeffb39e3263 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 24 Jun 2020 00:35:34 +0200 Subject: [PATCH] sched: Also handle timeout on selector Cmp --- sched.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sched.go b/sched.go index 5b99f48ff..3b3e28d65 100644 --- a/sched.go +++ b/sched.go @@ -205,6 +205,8 @@ func (sh *scheduler) onWorkerFreed(wid WorkerID) { } } +var selectorTimeout = 5 * time.Second + func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) { sh.workersLk.Lock() defer sh.workersLk.Unlock() @@ -215,7 +217,7 @@ func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) { needRes := ResourceTable[req.taskType][sh.spt] for wid, worker := range sh.workers { - rpcCtx, cancel := context.WithTimeout(req.ctx, 5*time.Second) + rpcCtx, cancel := context.WithTimeout(req.ctx, selectorTimeout) ok, err := req.sel.Ok(rpcCtx, req.taskType, sh.spt, worker) cancel() @@ -240,7 +242,10 @@ func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) { var serr error sort.SliceStable(acceptable, func(i, j int) bool { - r, err := req.sel.Cmp(req.ctx, req.taskType, sh.workers[acceptable[i]], sh.workers[acceptable[j]]) + rpcCtx, cancel := context.WithTimeout(req.ctx, selectorTimeout) + defer cancel() + r, err := req.sel.Cmp(rpcCtx, req.taskType, sh.workers[acceptable[i]], sh.workers[acceptable[j]]) + if err != nil { serr = multierror.Append(serr, err) }