Merge pull request #56 from filecoin-project/feat/shed-sel-cmp-timeout

sched: Also handle timeout on selector Cmp
This commit is contained in:
Łukasz Magiera 2020-06-24 00:46:36 +02:00 committed by GitHub
commit de544b5316
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -205,6 +205,8 @@ func (sh *scheduler) onWorkerFreed(wid WorkerID) {
} }
} }
var selectorTimeout = 5 * time.Second
func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) { func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) {
sh.workersLk.Lock() sh.workersLk.Lock()
defer sh.workersLk.Unlock() defer sh.workersLk.Unlock()
@ -215,7 +217,7 @@ func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) {
needRes := ResourceTable[req.taskType][sh.spt] needRes := ResourceTable[req.taskType][sh.spt]
for wid, worker := range sh.workers { for wid, worker := range sh.workers {
rpcCtx, cancel := context.WithTimeout(req.ctx, 5*time.Second) rpcCtx, cancel := context.WithTimeout(req.ctx, selectorTimeout)
ok, err := req.sel.Ok(rpcCtx, req.taskType, sh.spt, worker) ok, err := req.sel.Ok(rpcCtx, req.taskType, sh.spt, worker)
cancel() cancel()
@ -240,7 +242,10 @@ func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) {
var serr error var serr error
sort.SliceStable(acceptable, func(i, j int) bool { sort.SliceStable(acceptable, func(i, j int) bool {
r, err := req.sel.Cmp(req.ctx, req.taskType, sh.workers[acceptable[i]], sh.workers[acceptable[j]]) rpcCtx, cancel := context.WithTimeout(req.ctx, selectorTimeout)
defer cancel()
r, err := req.sel.Cmp(rpcCtx, req.taskType, sh.workers[acceptable[i]], sh.workers[acceptable[j]])
if err != nil { if err != nil {
serr = multierror.Append(serr, err) serr = multierror.Append(serr, err)
} }