Merge pull request #8725 from filecoin-project/feat/worker-task-count-limits

feat: sched: Per worker concurrent task count limits
This commit is contained in:
Łukasz Magiera 2022-05-27 16:57:38 +02:00 committed by GitHub
commit 5624b7eaad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 764 additions and 337 deletions

Binary file not shown.

Binary file not shown.

View File

@ -105,13 +105,20 @@ func workersCmd(sealing bool) *cli.Command {
return st[i].id.String() < st[j].id.String()
})
/*
Example output:
Worker c4d65451-07f8-4230-98ad-4f33dea2a8cc, host myhostname
TASK: PC1(1/4) AP(15/15) GET(3)
CPU: [|||||||| ] 16/128 core(s) in use
RAM: [|||||||| ] 12% 125.8 GiB/1008 GiB
VMEM: [|||||||| ] 12% 125.8 GiB/1008 GiB
GPU: [ ] 0% 0.00/1 gpu(s) in use
GPU: NVIDIA GeForce RTX 3090, not used
*/
for _, stat := range st {
gpuUse := "not "
gpuCol := color.FgBlue
if stat.GpuUsed > 0 {
gpuCol = color.FgGreen
gpuUse = ""
}
// Worker uuid + name
var disabled string
if !stat.Enabled {
@ -120,9 +127,53 @@ func workersCmd(sealing bool) *cli.Command {
fmt.Printf("Worker %s, host %s%s\n", stat.id, color.MagentaString(stat.Info.Hostname), disabled)
// Task counts
tc := make([][]string, 0, len(stat.TaskCounts))
for st, c := range stat.TaskCounts {
if c == 0 {
continue
}
stt, err := sealtasks.SttFromString(st)
if err != nil {
return err
}
str := fmt.Sprint(c)
if max := stat.Info.Resources.ResourceSpec(stt.RegisteredSealProof, stt.TaskType).MaxConcurrent; max > 0 {
switch {
case c < max:
str = color.GreenString(str)
case c >= max:
str = color.YellowString(str)
}
str = fmt.Sprintf("%s/%d", str, max)
} else {
str = color.CyanString(str)
}
str = fmt.Sprintf("%s(%s)", color.BlueString(stt.Short()), str)
tc = append(tc, []string{string(stt.TaskType), str})
}
sort.Slice(tc, func(i, j int) bool {
return sealtasks.TaskType(tc[i][0]).Less(sealtasks.TaskType(tc[j][0]))
})
var taskStr string
for _, t := range tc {
taskStr = t[1] + " "
}
if taskStr != "" {
fmt.Printf("\tTASK: %s\n", taskStr)
}
// CPU use
fmt.Printf("\tCPU: [%s] %d/%d core(s) in use\n",
barString(float64(stat.Info.Resources.CPUs), 0, float64(stat.CpuUse)), stat.CpuUse, stat.Info.Resources.CPUs)
// RAM use
ramTotal := stat.Info.Resources.MemPhysical
ramTasks := stat.MemUsedMin
ramUsed := stat.Info.Resources.MemUsed
@ -137,6 +188,8 @@ func workersCmd(sealing bool) *cli.Command {
types.SizeStr(types.NewInt(ramTasks+ramUsed)),
types.SizeStr(types.NewInt(stat.Info.Resources.MemPhysical)))
// VMEM use (ram+swap)
vmemTotal := stat.Info.Resources.MemPhysical + stat.Info.Resources.MemSwap
vmemTasks := stat.MemUsedMax
vmemUsed := stat.Info.Resources.MemUsed + stat.Info.Resources.MemSwapUsed
@ -151,12 +204,21 @@ func workersCmd(sealing bool) *cli.Command {
types.SizeStr(types.NewInt(vmemTasks+vmemReserved)),
types.SizeStr(types.NewInt(vmemTotal)))
// GPU use
if len(stat.Info.Resources.GPUs) > 0 {
gpuBar := barString(float64(len(stat.Info.Resources.GPUs)), 0, stat.GpuUsed)
fmt.Printf("\tGPU: [%s] %.f%% %.2f/%d gpu(s) in use\n", color.GreenString(gpuBar),
stat.GpuUsed*100/float64(len(stat.Info.Resources.GPUs)),
stat.GpuUsed, len(stat.Info.Resources.GPUs))
}
gpuUse := "not "
gpuCol := color.FgBlue
if stat.GpuUsed > 0 {
gpuCol = color.FgGreen
gpuUse = ""
}
for _, gpu := range stat.Info.Resources.GPUs {
fmt.Printf("\tGPU: %s\n", color.New(gpuCol).Sprintf("%s, %sused", gpu, gpuUse))
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -85,8 +85,8 @@ type WorkerHandle struct {
Info storiface.WorkerInfo
preparing *activeResources // use with WorkerHandle.lk
active *activeResources // use with WorkerHandle.lk
preparing *ActiveResources // use with WorkerHandle.lk
active *ActiveResources // use with WorkerHandle.lk
lk sync.Mutex // can be taken inside sched.workersLk.RLock
@ -108,7 +108,7 @@ type SchedWindowRequest struct {
}
type SchedWindow struct {
Allocated activeResources
Allocated ActiveResources
Todo []*WorkerRequest
}
@ -118,16 +118,6 @@ type workerDisableReq struct {
done func()
}
type activeResources struct {
memUsedMin uint64
memUsedMax uint64
gpuUsed float64
cpuUse uint64
cond *sync.Cond
waiting int
}
type WorkerRequest struct {
Sector storage.SectorRef
TaskType sealtasks.TaskType
@ -228,6 +218,13 @@ func (r *WorkerRequest) respond(err error) {
}
}
func (r *WorkerRequest) SealTask() sealtasks.SealTaskType {
return sealtasks.SealTaskType{
TaskType: r.TaskType,
RegisteredSealProof: r.Sector.ProofType,
}
}
type SchedDiagRequestInfo struct {
Sector abi.SectorID
TaskType sealtasks.TaskType

View File

@ -44,6 +44,9 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
}
windows := make([]SchedWindow, windowsLen)
for i := range windows {
windows[i].Allocated = *NewActiveResources()
}
acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex
// Step 1
@ -81,7 +84,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
needRes := worker.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)
// TODO: allow bigger windows
if !windows[wnd].Allocated.CanHandleRequest(needRes, windowRequest.Worker, "schedAcceptable", worker.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), needRes, windowRequest.Worker, "schedAcceptable", worker.Info) {
continue
}

View File

@ -34,7 +34,7 @@ func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(res, wid, "schedAssign", w.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}
@ -64,7 +64,7 @@ func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []
"assigned", bestAssigned)
workerAssigned[bestWid]++
windows[selectedWindow].Allocated.Add(info.Resources, needRes)
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)

View File

@ -35,7 +35,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int,
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
// TODO: allow bigger windows
if !windows[wnd].Allocated.CanHandleRequest(res, wid, "schedAssign", w.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}
@ -81,7 +81,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int,
"worker", bestWid,
"utilization", bestUtilization)
workerUtil[bestWid] += windows[selectedWindow].Allocated.Add(info.Resources, needRes)
workerUtil[bestWid] += windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)

View File

@ -105,7 +105,7 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg
selected := candidates[0]
worker := ps.workers[selected.id]
return worker.active.withResources(selected.id, worker.Info, selected.res, &ps.lk, func() error {
return worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()
@ -124,7 +124,7 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand
for wid, wr := range ps.workers {
needRes := wr.Info.Resources.ResourceSpec(spt, ps.postType)
if !wr.active.CanHandleRequest(needRes, wid, "post-readyWorkers", wr.Info) {
if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) {
continue
}

View File

@ -9,8 +9,26 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.CanHandleRequest(r, id, "withResources", wr) {
type ActiveResources struct {
memUsedMin uint64
memUsedMax uint64
gpuUsed float64
cpuUse uint64
taskCounters map[sealtasks.SealTaskType]int
cond *sync.Cond
waiting int
}
func NewActiveResources() *ActiveResources {
return &ActiveResources{
taskCounters: map[sealtasks.SealTaskType]int{},
}
}
func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.CanHandleRequest(tt, r, id, "withResources", wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
}
@ -19,22 +37,22 @@ func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.Work
a.waiting--
}
a.Add(wr.Resources, r)
a.Add(tt, wr.Resources, r)
err := cb()
a.free(wr.Resources, r)
a.Free(tt, wr.Resources, r)
return err
}
// must be called with the same lock as the one passed to withResources
func (a *activeResources) hasWorkWaiting() bool {
func (a *ActiveResources) hasWorkWaiting() bool {
return a.waiting > 0
}
// add task resources to activeResources and return utilization difference
func (a *activeResources) Add(wr storiface.WorkerResources, r storiface.Resources) float64 {
// add task resources to ActiveResources and return utilization difference
func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 {
startUtil := a.utilization(wr)
if r.GPUUtilization > 0 {
@ -43,17 +61,19 @@ func (a *activeResources) Add(wr storiface.WorkerResources, r storiface.Resource
a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory
a.taskCounters[tt]++
return a.utilization(wr) - startUtil
}
func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resources) {
func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) {
if r.GPUUtilization > 0 {
a.gpuUsed -= r.GPUUtilization
}
a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
a.taskCounters[tt]--
if a.cond != nil {
a.cond.Broadcast()
@ -62,7 +82,14 @@ func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resourc
// CanHandleRequest evaluates if the worker has enough available resources to
// handle the request.
func (a *activeResources) CanHandleRequest(needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
func (a *ActiveResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
if needRes.MaxConcurrent > 0 {
if a.taskCounters[tt] >= needRes.MaxConcurrent {
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters[tt])
return false
}
}
if info.IgnoreResources {
// shortcircuit; if this worker is ignoring resources, it can always handle the request.
return true
@ -110,7 +137,7 @@ func (a *activeResources) CanHandleRequest(needRes storiface.Resources, wid stor
}
// utilization returns a number in 0..1 range indicating fraction of used resources
func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
func (a *ActiveResources) utilization(wr storiface.WorkerResources) float64 { // todo task type
var max float64
cpu := float64(a.cpuUse) / float64(wr.CPUs)

View File

@ -614,8 +614,8 @@ func BenchmarkTrySched(b *testing.B) {
Hostname: "t",
Resources: decentWorkerResources,
},
preparing: &activeResources{},
active: &activeResources{},
preparing: NewActiveResources(),
active: NewActiveResources(),
}
for i := 0; i < windows; i++ {
@ -659,14 +659,16 @@ func TestWindowCompact(t *testing.T) {
}
for _, windowTasks := range start {
window := &SchedWindow{}
window := &SchedWindow{
Allocated: *NewActiveResources(),
}
for _, task := range windowTasks {
window.Todo = append(window.Todo, &WorkerRequest{
TaskType: task,
Sector: storage.SectorRef{ProofType: spt},
})
window.Allocated.Add(wh.Info.Resources, storiface.ResourceTable[task][spt])
window.Allocated.Add(task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
}
wh.activeWindows = append(wh.activeWindows, window)
@ -681,11 +683,11 @@ func TestWindowCompact(t *testing.T) {
require.Equal(t, len(start)-len(expect), -sw.windowsRequested)
for wi, tasks := range expect {
var expectRes activeResources
expectRes := NewActiveResources()
for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].Todo[ti].TaskType, "%d, %d", wi, ti)
expectRes.Add(wh.Info.Resources, storiface.ResourceTable[task][spt])
expectRes.Add(task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
}
require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].Allocated.cpuUse, "%d", wi)

View File

@ -34,8 +34,8 @@ func newWorkerHandle(ctx context.Context, w Worker) (*WorkerHandle, error) {
workerRpc: w,
Info: info,
preparing: &activeResources{},
active: &activeResources{},
preparing: NewActiveResources(),
active: NewActiveResources(),
Enabled: true,
closingMgr: make(chan struct{}),
@ -292,14 +292,14 @@ func (sw *schedWorker) workerCompactWindows() {
for ti, todo := range window.Todo {
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType)
if !lower.Allocated.CanHandleRequest(needRes, sw.wid, "compactWindows", worker.Info) {
if !lower.Allocated.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "compactWindows", worker.Info) {
continue
}
moved = append(moved, ti)
lower.Todo = append(lower.Todo, todo)
lower.Allocated.Add(worker.Info.Resources, needRes)
window.Allocated.free(worker.Info.Resources, needRes)
lower.Allocated.Add(todo.SealTask(), worker.Info.Resources, needRes)
window.Allocated.Free(todo.SealTask(), worker.Info.Resources, needRes)
}
if len(moved) > 0 {
@ -353,7 +353,7 @@ assignLoop:
worker.lk.Lock()
for t, todo := range firstWindow.Todo {
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType)
if worker.preparing.CanHandleRequest(needRes, sw.wid, "startPreparing", worker.Info) {
if worker.preparing.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) {
tidx = t
break
}
@ -413,8 +413,8 @@ assignLoop:
continue
}
needRes := storiface.ResourceTable[todo.TaskType][todo.Sector.ProofType]
if worker.active.CanHandleRequest(needRes, sw.wid, "startPreparing", worker.Info) {
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType)
if worker.active.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) {
tidx = t
break
}
@ -454,7 +454,7 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType)
w.lk.Lock()
w.preparing.Add(w.Info.Resources, needRes)
w.preparing.Add(req.SealTask(), w.Info.Resources, needRes)
w.lk.Unlock()
go func() {
@ -465,7 +465,7 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
w.lk.Lock()
if err != nil {
w.preparing.free(w.Info.Resources, needRes)
w.preparing.Free(req.SealTask(), w.Info.Resources, needRes)
w.lk.Unlock()
select {
@ -494,8 +494,8 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
}()
// wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.Info, needRes, &w.lk, func() error {
w.preparing.free(w.Info.Resources, needRes)
err = w.active.withResources(sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error {
w.preparing.Free(req.SealTask(), w.Info.Resources, needRes)
w.lk.Unlock()
defer w.lk.Lock() // we MUST return locked from this function
@ -536,7 +536,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *WorkerRequest) error {
needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType)
w.active.Add(w.Info.Resources, needRes)
w.active.Add(req.SealTask(), w.Info.Resources, needRes)
go func() {
// Do the work!
@ -554,7 +554,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *WorkerRequest) error {
w.lk.Lock()
w.active.free(w.Info.Resources, needRes)
w.active.Free(req.SealTask(), w.Info.Resources, needRes)
select {
case sw.taskDone <- struct{}{}:

View File

@ -1,5 +1,15 @@
package sealtasks
import (
"fmt"
"strconv"
"strings"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
)
type TaskType string
const (
@ -104,3 +114,37 @@ func (a TaskType) Short() string {
return n
}
type SealTaskType struct {
TaskType
abi.RegisteredSealProof
}
func (a TaskType) SealTask(spt abi.RegisteredSealProof) SealTaskType {
return SealTaskType{
TaskType: a,
RegisteredSealProof: spt,
}
}
func SttFromString(s string) (SealTaskType, error) {
var res SealTaskType
sub := strings.SplitN(s, ":", 2)
if len(sub) != 2 {
return res, xerrors.Errorf("seal task type string invalid")
}
res.TaskType = TaskType(sub[1])
spt, err := strconv.ParseInt(sub[0], 10, 64)
if err != nil {
return SealTaskType{}, err
}
res.RegisteredSealProof = abi.RegisteredSealProof(spt)
return res, nil
}
func (a SealTaskType) String() string {
return fmt.Sprintf("%d:%s", a.RegisteredSealProof, a.TaskType)
}

View File

@ -39,7 +39,14 @@ func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.Worke
MemUsedMax: handle.active.memUsedMax,
GpuUsed: handle.active.gpuUsed,
CpuUse: handle.active.cpuUse,
TaskCounts: map[string]int{},
}
for tt, count := range handle.active.taskCounters {
out[uuid.UUID(id)].TaskCounts[tt.String()] = count
}
handle.lk.Unlock()
}

View File

@ -26,6 +26,8 @@ type Resources struct {
MaxParallelismGPU int `envname:"MAX_PARALLELISM_GPU"` // when 0, inherits MaxParallelism
BaseMinMemory uint64 `envname:"BASE_MIN_MEMORY"` // What Must be in RAM for decent perf (shared between threads)
MaxConcurrent int `envname:"MAX_CONCURRENT"` // Maximum number of tasks of this type that can be scheduled on a worker (0=default, no limit)
}
/*

View File

@ -75,6 +75,8 @@ type WorkerStats struct {
MemUsedMax uint64
GpuUsed float64 // nolint
CpuUse uint64 // nolint
TaskCounts map[string]int
}
const (