package sectorstorage import ( "sync" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { for !a.canHandleRequest(r, id, "withResources", wr) { if a.cond == nil { a.cond = sync.NewCond(locker) } a.cond.Wait() } a.add(wr, r) err := cb() a.free(wr, r) if a.cond != nil { a.cond.Broadcast() } return err } func (a *activeResources) add(wr storiface.WorkerResources, r Resources) { a.gpuUsed = r.CanGPU if r.MultiThread() { a.cpuUse += wr.CPUs } else { a.cpuUse += uint64(r.Threads) } a.memUsedMin += r.MinMemory a.memUsedMax += r.MaxMemory } func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { if r.CanGPU { a.gpuUsed = false } if r.MultiThread() { a.cpuUse -= wr.CPUs } else { a.cpuUse -= uint64(r.Threads) } a.memUsedMin -= r.MinMemory a.memUsedMax -= r.MaxMemory } func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, res storiface.WorkerResources) bool { // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory if minNeedMem > res.MemPhysical { log.Debugf("sched: not scheduling on worker %d for %s; not enough physical memory - need: %dM, have %dM", wid, caller, minNeedMem/mib, res.MemPhysical/mib) return false } maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory if maxNeedMem > res.MemSwap+res.MemPhysical { log.Debugf("sched: not scheduling on worker %d for %s; not enough virtual memory - need: %dM, have %dM", wid, caller, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) return false } if needRes.MultiThread() { if a.cpuUse > 0 { log.Debugf("sched: not scheduling on worker %d for %s; multicore process needs %d threads, %d in use, target %d", wid, caller, res.CPUs, a.cpuUse, res.CPUs) return false } } else { if a.cpuUse+uint64(needRes.Threads) > res.CPUs { log.Debugf("sched: not scheduling on worker %d for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads, a.cpuUse, res.CPUs) return false } } if len(res.GPUs) > 0 && needRes.CanGPU { if a.gpuUsed { log.Debugf("sched: not scheduling on worker %d for %s; GPU in use", wid, caller) return false } } return true } func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { var max float64 cpu := float64(a.cpuUse) / float64(wr.CPUs) max = cpu memMin := float64(a.memUsedMin+wr.MemReserved) / float64(wr.MemPhysical) if memMin > max { max = memMin } memMax := float64(a.memUsedMax+wr.MemReserved) / float64(wr.MemPhysical+wr.MemSwap) if memMax > max { max = memMax } return max }