harmonytask - final review comments

This commit is contained in:
Andrew Jackson (Ajax) 2023-08-25 16:11:31 -05:00
parent ec8fd28834
commit 72917c19cd
7 changed files with 42 additions and 31 deletions

View File

@ -609,7 +609,6 @@ var runCmd = &cli.Command{
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
//taskManager.GracefullyTerminate(5*time.Hour)
log.Warn("Graceful shutdown successful")
}()

View File

@ -21,7 +21,7 @@ import (
type task1 struct {
toAdd []int
myPersonalTableLock sync.Mutex
myPersonalTable map[harmonytask.TaskID]int // This would typicallyb be a DB table
myPersonalTable map[harmonytask.TaskID]int // This would typically be a DB table
WorkCompleted []string
}

View File

@ -23,8 +23,9 @@ Mental Model:
- Async Listener task (for chain, etc)
- Followers: Tasks get added because another task completed
When Follower collectors run:
- If both sides are process-local, then
- Otherwise, at the listen interval during db scrape
- If both sides are process-local, then this process will pick it up.
- If properly registered already, the http endpoint will be tried to start it.
- Otherwise, at the listen interval during db scrape it will be found.
How duplicate tasks are avoided:
- that's up to the task definition, but probably a unique key

View File

@ -56,7 +56,7 @@ type TaskInterface interface {
// CanAccept should return if the task can run on this machine. It should
// return null if the task type is not allowed on this machine.
// It should select the task it most wants to accomplish.
// It is also responsible for determining disk space (including scratch).
// It is also responsible for determining & reserving disk space (including scratch).
CanAccept([]TaskID) (*TaskID, error)
// TypeDetails() returns static details about how this task behaves and
@ -181,7 +181,7 @@ func New(
if h == nil {
_, err := db.Exec(e.ctx, `UPDATE harmony_task SET owner=NULL WHERE id=$1`, w.ID)
if err != nil {
log.Error("Cannot remove self from owner field: ", err)
log.Errorw("Cannot remove self from owner field", "error", err)
continue // not really fatal, but not great
}
}
@ -206,12 +206,14 @@ func (e *TaskEngine) GracefullyTerminate(deadline time.Duration) {
e.reg.Shutdown()
deadlineChan := time.NewTimer(deadline).C
ctx := context.TODO()
// block bumps & follows by unreg from DBs.
_, err := e.db.Exec(context.TODO(), `DELETE FROM harmony_task_impl WHERE owner_id=$1`, e.ownerID)
_, err := e.db.Exec(ctx, `DELETE FROM harmony_task_impl WHERE owner_id=$1`, e.ownerID)
if err != nil {
log.Warn("Could not clean-up impl table: %w", err)
}
_, err = e.db.Exec(context.Background(), `DELETE FROM harmony_task_follow WHERE owner_id=$1`, e.ownerID)
_, err = e.db.Exec(ctx, `DELETE FROM harmony_task_follow WHERE owner_id=$1`, e.ownerID)
if err != nil {
log.Warn("Could not clean-up impl table: %w", err)
}
@ -271,7 +273,7 @@ func (e *TaskEngine) followWorkInDB() {
// we need to create this task
if !src.h.Follows[fromName](TaskID(workAlreadyDone), src.h.AddTask) {
// But someone may have beaten us to it.
log.Infof("Unable to add task %s following Task(%d, %s)", src.h.Name, workAlreadyDone, fromName)
log.Debugf("Unable to add task %s following Task(%d, %s)", src.h.Name, workAlreadyDone, fromName)
}
}
}
@ -386,11 +388,21 @@ func (e *TaskEngine) bump(taskType string) {
// resourcesInUse requires workListsMutex to be already locked.
func (e *TaskEngine) resourcesInUse() resources.Resources {
tmp := e.reg.Resources
copy(tmp.GpuRam, e.reg.Resources.GpuRam)
for _, t := range e.handlers {
ct := t.Count.Load()
tmp.Cpu -= int(ct) * t.Cost.Cpu
tmp.Gpu -= float64(ct) * t.Cost.Gpu
tmp.Ram -= uint64(ct) * t.Cost.Ram
for i := int32(0); i < ct; i++ {
for grIdx, j := range tmp.GpuRam {
if j > t.Cost.GpuRam[0] {
tmp.GpuRam[grIdx] = j - t.Cost.GpuRam[0]
break
}
}
log.Warn("We should never get out of gpuram for what's consumed.")
}
}
return tmp
}

View File

@ -223,6 +223,13 @@ func (h *taskTypeHandler) AssertMachineHasCapacity() error {
if r.Gpu-h.Cost.Gpu < 0 {
return errors.New("Did not accept " + h.Name + " task: out of available GPU")
}
for _, u := range r.GpuRam {
if u > h.Cost.GpuRam[0] {
goto enoughGpuRam
}
}
return errors.New("Did not accept " + h.Name + " task: out of GPURam")
enoughGpuRam:
return nil
}

View File

@ -1,3 +1,5 @@
// Package cl was borrowed from the go-opencl library which is more complex and
// doesn't compile well for our needs.
package cl
// #include "cl.h"

View File

@ -27,7 +27,7 @@ var LOOKS_DEAD_TIMEOUT = 10 * time.Minute // Time w/o minute heartbeats
type Resources struct {
Cpu int
Gpu float64
GpuRam uint64
GpuRam []uint64
Ram uint64
MachineID int
}
@ -72,7 +72,8 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
return nil, err
}
}
CleanupMachines(context.Background(), db)
cleaned := CleanupMachines(context.Background(), db)
logger.Infow("Cleaned up machines", "count", cleaned)
}
go func() {
for {
@ -138,21 +139,24 @@ func getResources() (res Resources, err error) {
return res, nil
}
func getGpuRam() uint64 {
func getGpuRam() (res []uint64) {
platforms, err := cl.GetPlatforms()
if err != nil {
logger.Error(err)
return 0
return res
}
return uint64(lo.SumBy(platforms, func(p *cl.Platform) int64 {
lo.ForEach(platforms, func(p *cl.Platform, i int) {
d, err := p.GetAllDevices()
if err != nil {
logger.Error(err)
return 0
return
}
return lo.SumBy(d, func(d *cl.Device) int64 { return d.GlobalMemSize() })
}))
lo.ForEach(d, func(d *cl.Device, i int) {
res = append(res, uint64(d.GlobalMemSize()))
})
})
return res
}
func DiskFree(path string) (uint64, error) {
@ -164,17 +168,3 @@ func DiskFree(path string) (uint64, error) {
return s.Bfree * uint64(s.Bsize), nil
}
/* NOT for Darwin.
func GetMemFree() uint64 {
in := unix.Sysinfo_t{}
err := unix.Sysinfo(&in)
if err != nil {
return 0
}
// If this is a 32-bit system, then these fields are
// uint32 instead of uint64.
// So we always convert to uint64 to match signature.
return uint64(in.Freeram) * uint64(in.Unit)
}
*/