task- fix deadlock and mac gpu ct

This commit is contained in:
Andrew Jackson (Ajax) 2023-10-31 17:13:16 -05:00
parent 1ff0d61adb
commit e37c874004
5 changed files with 45 additions and 84 deletions

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"strconv" "strconv"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -100,7 +99,6 @@ type TaskEngine struct {
ctx context.Context ctx context.Context
handlers []*taskTypeHandler handlers []*taskTypeHandler
db *harmonydb.DB db *harmonydb.DB
workAdderMutex sync.Mutex
reg *resources.Reg reg *resources.Reg
grace context.CancelFunc grace context.CancelFunc
taskMap map[string]*taskTypeHandler taskMap map[string]*taskTypeHandler
@ -289,33 +287,14 @@ func (e *TaskEngine) pollerTryAllWork() {
} }
} }
// ResourcesAvailable determines what resources are still unassigned.
func (e *TaskEngine) ResourcesAvailable() resources.Resources { func (e *TaskEngine) ResourcesAvailable() resources.Resources {
e.workAdderMutex.Lock()
defer e.workAdderMutex.Unlock()
return e.resoourcesAvailable()
}
// resoourcesAvailable requires workAdderMutex to be already locked.
func (e *TaskEngine) resoourcesAvailable() resources.Resources {
tmp := e.reg.Resources tmp := e.reg.Resources
copy(tmp.GpuRam, e.reg.Resources.GpuRam)
for _, t := range e.handlers { for _, t := range e.handlers {
ct := t.Count.Load() ct := t.Count.Load()
tmp.Cpu -= int(ct) * t.Cost.Cpu tmp.Cpu -= int(ct) * t.Cost.Cpu
tmp.Gpu -= float64(ct) * t.Cost.Gpu tmp.Gpu -= float64(ct) * t.Cost.Gpu
tmp.Ram -= uint64(ct) * t.Cost.Ram tmp.Ram -= uint64(ct) * t.Cost.Ram
if len(t.Cost.GpuRam) == 0 {
continue
}
for i := int32(0); i < ct; i++ {
for grIdx, j := range tmp.GpuRam {
if j > t.Cost.GpuRam[0] {
tmp.GpuRam[grIdx] = 0 // Only 1 per GPU. j - t.Cost.GpuRam[0]
break
}
}
log.Warn("We should never get out of gpuram for what's consumed.")
}
} }
return tmp return tmp
} }

View File

@ -10,7 +10,6 @@ import (
"time" "time"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/samber/lo"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
) )
@ -50,6 +49,11 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error
} }
} }
// considerWork is called to attempt to start work on a task-id of this task type.
// It presumes single-threaded calling, so there should not be a multi-threaded re-entry.
// The only caller should be the one work poller thread. This does spin off other threads,
// but those should not considerWork. Work completing may lower the resource numbers
// unexpectedly, but that will not invalidate work being already able to fit.
func (h *taskTypeHandler) considerWork(from string, ids []TaskID) (workAccepted bool) { func (h *taskTypeHandler) considerWork(from string, ids []TaskID) (workAccepted bool) {
top: top:
if len(ids) == 0 { if len(ids) == 0 {
@ -64,10 +68,8 @@ top:
return false return false
} }
h.TaskEngine.workAdderMutex.Lock() // 2. Can we do any more work? From here onward, we presume the resource
defer h.TaskEngine.workAdderMutex.Unlock() // story will not change, so single-threaded calling is best.
// 2. Can we do any more work?
err := h.AssertMachineHasCapacity() err := h.AssertMachineHasCapacity()
if err != nil { if err != nil {
log.Debugw("did not accept task", "name", h.Name, "reason", "at capacity already: "+err.Error()) log.Debugw("did not accept task", "name", h.Name, "reason", "at capacity already: "+err.Error())
@ -213,7 +215,7 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done
} }
func (h *taskTypeHandler) AssertMachineHasCapacity() error { func (h *taskTypeHandler) AssertMachineHasCapacity() error {
r := h.TaskEngine.resoourcesAvailable() r := h.TaskEngine.ResourcesAvailable()
if r.Cpu-h.Cost.Cpu < 0 { if r.Cpu-h.Cost.Cpu < 0 {
return errors.New("Did not accept " + h.Name + " task: out of cpu") return errors.New("Did not accept " + h.Name + " task: out of cpu")
@ -224,16 +226,5 @@ func (h *taskTypeHandler) AssertMachineHasCapacity() error {
if r.Gpu-h.Cost.Gpu < 0 { if r.Gpu-h.Cost.Gpu < 0 {
return errors.New("Did not accept " + h.Name + " task: out of available GPU") return errors.New("Did not accept " + h.Name + " task: out of available GPU")
} }
gpuRamSum := lo.Sum(h.Cost.GpuRam)
if gpuRamSum == 0 {
goto enoughGpuRam
}
for _, u := range r.GpuRam {
if u >= gpuRamSum {
goto enoughGpuRam
}
}
return errors.New("Did not accept " + h.Name + " task: out of GPURam")
enoughGpuRam:
return nil return nil
} }

View File

@ -0,0 +1,21 @@
//go:build !darwin
// +build !darwin
package resources
import (
"strings"
ffi "github.com/filecoin-project/filecoin-ffi"
)
func getGPUDevices() float64 { // GPU boolean
gpus, err := ffi.GetGPUDevices()
if err != nil {
logger.Errorf("getting gpu devices failed: %+v", err)
}
all := strings.ToLower(strings.Join(gpus, ","))
if len(gpus) > 1 || strings.Contains(all, "ati") || strings.Contains(all, "nvidia") {
return float64(len(gpus))
}
}

View File

@ -0,0 +1,8 @@
//go:build darwin
// +build darwin
package resources
func getGPUDevices() float64 {
return 10000.0 // unserious value intended for non-production use.
}

View File

@ -7,19 +7,14 @@ import (
"os/exec" "os/exec"
"regexp" "regexp"
"runtime" "runtime"
"strings"
"sync/atomic" "sync/atomic"
"time" "time"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/pbnjay/memory" "github.com/pbnjay/memory"
"github.com/samber/lo"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
cl "github.com/filecoin-project/lotus/lib/harmony/resources/miniopencl"
) )
var LOOKS_DEAD_TIMEOUT = 10 * time.Minute // Time w/o minute heartbeats var LOOKS_DEAD_TIMEOUT = 10 * time.Minute // Time w/o minute heartbeats
@ -27,7 +22,6 @@ var LOOKS_DEAD_TIMEOUT = 10 * time.Minute // Time w/o minute heartbeats
type Resources struct { type Resources struct {
Cpu int Cpu int
Gpu float64 Gpu float64
GpuRam []uint64
Ram uint64 Ram uint64
MachineID int MachineID int
} }
@ -54,12 +48,11 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("could not read from harmony_machines: %w", err) return nil, fmt.Errorf("could not read from harmony_machines: %w", err)
} }
gpuram := uint64(lo.Sum(reg.GpuRam))
if len(ownerID) == 0 { if len(ownerID) == 0 {
err = db.QueryRow(ctx, `INSERT INTO harmony_machines err = db.QueryRow(ctx, `INSERT INTO harmony_machines
(host_and_port, cpu, ram, gpu, gpuram) VALUES (host_and_port, cpu, ram, gpu, gpuram) VALUES
($1,$2,$3,$4,$5) RETURNING id`, ($1,$2,$3,$4,0) RETURNING id`,
hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu, gpuram).Scan(&reg.Resources.MachineID) hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu).Scan(&reg.Resources.MachineID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -67,8 +60,8 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
} else { } else {
reg.MachineID = ownerID[0] reg.MachineID = ownerID[0]
_, err := db.Exec(ctx, `UPDATE harmony_machines SET _, err := db.Exec(ctx, `UPDATE harmony_machines SET
cpu=$1, ram=$2, gpu=$3, gpuram=$4 WHERE id=$5`, cpu=$1, ram=$2, gpu=$3 WHERE id=$5`,
reg.Cpu, reg.Ram, reg.Gpu, gpuram, reg.Resources.MachineID) reg.Cpu, reg.Ram, reg.Gpu, reg.Resources.MachineID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -121,45 +114,14 @@ func getResources() (res Resources, err error) {
} }
res = Resources{ res = Resources{
Cpu: runtime.NumCPU(), Cpu: runtime.NumCPU(),
Ram: memory.FreeMemory(), Ram: memory.FreeMemory(),
GpuRam: getGpuRam(), Gpu: getGPUDevices(),
}
{ // GPU boolean
gpus, err := ffi.GetGPUDevices()
if err != nil {
logger.Errorf("getting gpu devices failed: %+v", err)
}
all := strings.ToLower(strings.Join(gpus, ","))
if len(gpus) > 1 || strings.Contains(all, "ati") || strings.Contains(all, "nvidia") {
res.Gpu = float64(len(gpus))
}
} }
return res, nil return res, nil
} }
func getGpuRam() (res []uint64) {
platforms, err := cl.GetPlatforms()
if err != nil {
logger.Error(err)
return res
}
lo.ForEach(platforms, func(p *cl.Platform, i int) {
d, err := p.GetAllDevices()
if err != nil {
logger.Error(err)
return
}
lo.ForEach(d, func(d *cl.Device, i int) {
res = append(res, uint64(d.GlobalMemSize()))
})
})
return res
}
func DiskFree(path string) (uint64, error) { func DiskFree(path string) (uint64, error) {
s := unix.Statfs_t{} s := unix.Statfs_t{}
err := unix.Statfs(path, &s) err := unix.Statfs(path, &s)