This commit is contained in:
Łukasz Magiera 2022-05-27 16:15:52 +02:00
parent 26a0b43116
commit 7117a8d80d
2 changed files with 13 additions and 13 deletions

View File

@ -85,8 +85,8 @@ type WorkerHandle struct {
Info storiface.WorkerInfo
preparing *activeResources // use with WorkerHandle.lk
active *activeResources // use with WorkerHandle.lk
preparing *ActiveResources // use with WorkerHandle.lk
active *ActiveResources // use with WorkerHandle.lk
lk sync.Mutex // can be taken inside sched.workersLk.RLock
@ -108,7 +108,7 @@ type SchedWindowRequest struct {
}
type SchedWindow struct {
Allocated activeResources
Allocated ActiveResources
Todo []*WorkerRequest
}

View File

@ -9,7 +9,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type activeResources struct {
type ActiveResources struct {
memUsedMin uint64
memUsedMax uint64
gpuUsed float64
@ -21,13 +21,13 @@ type activeResources struct {
waiting int
}
func NewActiveResources() *activeResources {
return &activeResources{
func NewActiveResources() *ActiveResources {
return &ActiveResources{
taskCounters: map[sealtasks.SealTaskType]int{},
}
}
func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.CanHandleRequest(tt, r, id, "withResources", wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
@ -47,12 +47,12 @@ func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.Work
}
// must be called with the same lock as the one passed to withResources
func (a *activeResources) hasWorkWaiting() bool {
func (a *ActiveResources) hasWorkWaiting() bool {
return a.waiting > 0
}
// add task resources to activeResources and return utilization difference
func (a *activeResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 {
// add task resources to ActiveResources and return utilization difference
func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 {
startUtil := a.utilization(wr)
if r.GPUUtilization > 0 {
@ -66,7 +66,7 @@ func (a *activeResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerReso
return a.utilization(wr) - startUtil
}
func (a *activeResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) {
func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) {
if r.GPUUtilization > 0 {
a.gpuUsed -= r.GPUUtilization
}
@ -82,7 +82,7 @@ func (a *activeResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerRes
// CanHandleRequest evaluates if the worker has enough available resources to
// handle the request.
func (a *activeResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
func (a *ActiveResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
if needRes.MaxConcurrent > 0 {
if a.taskCounters[tt] >= needRes.MaxConcurrent {
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters[tt])
@ -137,7 +137,7 @@ func (a *activeResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes st
}
// utilization returns a number in 0..1 range indicating fraction of used resources
func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { // todo task type
func (a *ActiveResources) utilization(wr storiface.WorkerResources) float64 { // todo task type
var max float64
cpu := float64(a.cpuUse) / float64(wr.CPUs)