sched: Share active/preparing task counters
This commit is contained in:
parent
745476c9ab
commit
2316363f7a
@ -58,7 +58,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
|
||||
|
||||
windows := make([]SchedWindow, windowsLen)
|
||||
for i := range windows {
|
||||
windows[i].Allocated = *NewActiveResources()
|
||||
windows[i].Allocated = *NewActiveResources(newTaskCounter())
|
||||
}
|
||||
acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex
|
||||
|
||||
|
@ -13,18 +13,68 @@ type ActiveResources struct {
|
||||
gpuUsed float64
|
||||
cpuUse uint64
|
||||
|
||||
taskCounters map[sealtasks.SealTaskType]int
|
||||
taskCounters *taskCounter
|
||||
|
||||
cond *sync.Cond
|
||||
waiting int
|
||||
}
|
||||
|
||||
func NewActiveResources() *ActiveResources {
|
||||
return &ActiveResources{
|
||||
type taskCounter struct {
|
||||
taskCounters map[sealtasks.SealTaskType]int
|
||||
|
||||
// this lock is technically redundant, as ActiveResources is always accessed
|
||||
// with the worker lock, but let's not panic if we ever change that
|
||||
lk sync.Mutex
|
||||
}
|
||||
|
||||
func newTaskCounter() *taskCounter {
|
||||
return &taskCounter{
|
||||
taskCounters: map[sealtasks.SealTaskType]int{},
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *taskCounter) Add(tt sealtasks.SealTaskType) {
|
||||
tc.lk.Lock()
|
||||
defer tc.lk.Unlock()
|
||||
tc.taskCounters[tt]++
|
||||
}
|
||||
|
||||
func (tc *taskCounter) Free(tt sealtasks.SealTaskType) {
|
||||
tc.lk.Lock()
|
||||
defer tc.lk.Unlock()
|
||||
tc.taskCounters[tt]--
|
||||
}
|
||||
|
||||
func (tc *taskCounter) Get(tt sealtasks.SealTaskType) int {
|
||||
tc.lk.Lock()
|
||||
defer tc.lk.Unlock()
|
||||
return tc.taskCounters[tt]
|
||||
}
|
||||
|
||||
func (tc *taskCounter) Sum() int {
|
||||
tc.lk.Lock()
|
||||
defer tc.lk.Unlock()
|
||||
sum := 0
|
||||
for _, v := range tc.taskCounters {
|
||||
sum += v
|
||||
}
|
||||
return sum
|
||||
}
|
||||
|
||||
func (tc *taskCounter) ForEach(cb func(tt sealtasks.SealTaskType, count int)) {
|
||||
tc.lk.Lock()
|
||||
defer tc.lk.Unlock()
|
||||
for tt, count := range tc.taskCounters {
|
||||
cb(tt, count)
|
||||
}
|
||||
}
|
||||
|
||||
func NewActiveResources(tc *taskCounter) *ActiveResources {
|
||||
return &ActiveResources{
|
||||
taskCounters: tc,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
|
||||
for !a.CanHandleRequest(tt, r, id, "withResources", wr) {
|
||||
if a.cond == nil {
|
||||
@ -59,7 +109,7 @@ func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerReso
|
||||
a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs))
|
||||
a.memUsedMin += r.MinMemory
|
||||
a.memUsedMax += r.MaxMemory
|
||||
a.taskCounters[tt]++
|
||||
a.taskCounters.Add(tt)
|
||||
|
||||
return a.utilization(wr) - startUtil
|
||||
}
|
||||
@ -71,7 +121,7 @@ func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerRes
|
||||
a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs))
|
||||
a.memUsedMin -= r.MinMemory
|
||||
a.memUsedMax -= r.MaxMemory
|
||||
a.taskCounters[tt]--
|
||||
a.taskCounters.Free(tt)
|
||||
|
||||
if a.cond != nil {
|
||||
a.cond.Broadcast()
|
||||
@ -82,8 +132,8 @@ func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerRes
|
||||
// handle the request.
|
||||
func (a *ActiveResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
|
||||
if needRes.MaxConcurrent > 0 {
|
||||
if a.taskCounters[tt] >= needRes.MaxConcurrent {
|
||||
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters[tt])
|
||||
if a.taskCounters.Get(tt) >= needRes.MaxConcurrent {
|
||||
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters.Get(tt))
|
||||
return false
|
||||
}
|
||||
}
|
||||
@ -173,14 +223,10 @@ func (a *ActiveResources) utilization(wr storiface.WorkerResources) float64 { //
|
||||
func (a *ActiveResources) taskCount(tt *sealtasks.SealTaskType) int {
|
||||
// nil means all tasks
|
||||
if tt == nil {
|
||||
var count int
|
||||
for _, c := range a.taskCounters {
|
||||
count += c
|
||||
}
|
||||
return count
|
||||
return a.taskCounters.Sum()
|
||||
}
|
||||
|
||||
return a.taskCounters[*tt]
|
||||
return a.taskCounters.Get(*tt)
|
||||
}
|
||||
|
||||
func (wh *WorkerHandle) Utilization() float64 {
|
||||
|
@ -639,8 +639,8 @@ func BenchmarkTrySched(b *testing.B) {
|
||||
Resources: decentWorkerResources,
|
||||
},
|
||||
Enabled: true,
|
||||
preparing: NewActiveResources(),
|
||||
active: NewActiveResources(),
|
||||
preparing: NewActiveResources(newTaskCounter()),
|
||||
active: NewActiveResources(newTaskCounter()),
|
||||
}
|
||||
|
||||
for i := 0; i < windows; i++ {
|
||||
@ -685,7 +685,7 @@ func TestWindowCompact(t *testing.T) {
|
||||
|
||||
for _, windowTasks := range start {
|
||||
window := &SchedWindow{
|
||||
Allocated: *NewActiveResources(),
|
||||
Allocated: *NewActiveResources(newTaskCounter()),
|
||||
}
|
||||
|
||||
for _, task := range windowTasks {
|
||||
@ -708,7 +708,7 @@ func TestWindowCompact(t *testing.T) {
|
||||
require.Equal(t, len(start)-len(expect), -sw.windowsRequested)
|
||||
|
||||
for wi, tasks := range expect {
|
||||
expectRes := NewActiveResources()
|
||||
expectRes := NewActiveResources(newTaskCounter())
|
||||
|
||||
for ti, task := range tasks {
|
||||
require.Equal(t, task, wh.activeWindows[wi].Todo[ti].TaskType, "%d, %d", wi, ti)
|
||||
|
@ -30,12 +30,14 @@ func newWorkerHandle(ctx context.Context, w Worker) (*WorkerHandle, error) {
|
||||
return nil, xerrors.Errorf("getting worker info: %w", err)
|
||||
}
|
||||
|
||||
tc := newTaskCounter()
|
||||
|
||||
worker := &WorkerHandle{
|
||||
workerRpc: w,
|
||||
Info: info,
|
||||
|
||||
preparing: NewActiveResources(),
|
||||
active: NewActiveResources(),
|
||||
preparing: NewActiveResources(tc),
|
||||
active: NewActiveResources(tc),
|
||||
Enabled: true,
|
||||
|
||||
closingMgr: make(chan struct{}),
|
||||
|
@ -43,9 +43,9 @@ func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.Worke
|
||||
TaskCounts: map[string]int{},
|
||||
}
|
||||
|
||||
for tt, count := range handle.active.taskCounters {
|
||||
handle.active.taskCounters.ForEach(func(tt sealtasks.SealTaskType, count int) {
|
||||
out[uuid.UUID(id)].TaskCounts[tt.String()] = count
|
||||
}
|
||||
})
|
||||
|
||||
handle.lk.Unlock()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user