sealing sched: Factor worker queues into utilization calc
This commit is contained in:
parent
98d51d3d80
commit
e14c80360d
4
extern/sector-storage/sched.go
vendored
4
extern/sector-storage/sched.go
vendored
@ -437,6 +437,10 @@ func (sh *scheduler) trySched() {
|
|||||||
log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.Number, task.taskType, wnd)
|
log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.Number, task.taskType, wnd)
|
||||||
|
|
||||||
windows[wnd].allocated.add(wr, needRes)
|
windows[wnd].allocated.add(wr, needRes)
|
||||||
|
// TODO: We probably want to re-sort acceptableWindows here based on new
|
||||||
|
// workerHandle.utilization + windows[wnd].allocated.utilization (workerHandle.utilization is used in all
|
||||||
|
// task selectors, but not in the same way, so need to figure out how to do that in a non-O(n^2 way), and
|
||||||
|
// without additional network roundtrips (O(n^2) could be avoided by turning acceptableWindows.[] into heaps))
|
||||||
|
|
||||||
selectedWindow = wnd
|
selectedWindow = wnd
|
||||||
break
|
break
|
||||||
|
14
extern/sector-storage/sched_resources.go
vendored
14
extern/sector-storage/sched_resources.go
vendored
@ -108,3 +108,17 @@ func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
|
|||||||
|
|
||||||
return max
|
return max
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (wh *workerHandle) utilization() float64 {
|
||||||
|
wh.lk.Lock()
|
||||||
|
u := wh.active.utilization(wh.info.Resources)
|
||||||
|
u += wh.preparing.utilization(wh.info.Resources)
|
||||||
|
wh.lk.Unlock()
|
||||||
|
wh.wndLk.Lock()
|
||||||
|
for _, window := range wh.activeWindows {
|
||||||
|
u += window.allocated.utilization(wh.info.Resources)
|
||||||
|
}
|
||||||
|
wh.wndLk.Unlock()
|
||||||
|
|
||||||
|
return u
|
||||||
|
}
|
||||||
|
2
extern/sector-storage/selector_alloc.go
vendored
2
extern/sector-storage/selector_alloc.go
vendored
@ -59,7 +59,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) {
|
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) {
|
||||||
return a.active.utilization(a.info.Resources) < b.active.utilization(b.info.Resources), nil
|
return a.utilization() < b.utilization(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ WorkerSelector = &allocSelector{}
|
var _ WorkerSelector = &allocSelector{}
|
||||||
|
2
extern/sector-storage/selector_existing.go
vendored
2
extern/sector-storage/selector_existing.go
vendored
@ -61,7 +61,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) {
|
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) {
|
||||||
return a.active.utilization(a.info.Resources) < b.active.utilization(b.info.Resources), nil
|
return a.utilization() < b.utilization(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ WorkerSelector = &existingSelector{}
|
var _ WorkerSelector = &existingSelector{}
|
||||||
|
2
extern/sector-storage/selector_task.go
vendored
2
extern/sector-storage/selector_task.go
vendored
@ -42,7 +42,7 @@ func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *work
|
|||||||
return len(atasks) < len(btasks), nil // prefer workers which can do less
|
return len(atasks) < len(btasks), nil // prefer workers which can do less
|
||||||
}
|
}
|
||||||
|
|
||||||
return a.active.utilization(a.info.Resources) < b.active.utilization(b.info.Resources), nil
|
return a.utilization() < b.utilization(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ WorkerSelector = &allocSelector{}
|
var _ WorkerSelector = &allocSelector{}
|
||||||
|
Loading…
Reference in New Issue
Block a user