diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index 995a3cbe0..944791275 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -609,7 +609,6 @@ var runCmd = &cli.Command{ if err := srv.Shutdown(context.TODO()); err != nil { log.Errorf("shutting down RPC server failed: %s", err) } - //taskManager.GracefullyTerminate(5*time.Hour) log.Warn("Graceful shutdown successful") }() diff --git a/itests/harmonytask_test.go b/itests/harmonytask_test.go index 80923b1d5..b36e9ab11 100644 --- a/itests/harmonytask_test.go +++ b/itests/harmonytask_test.go @@ -21,7 +21,7 @@ import ( type task1 struct { toAdd []int myPersonalTableLock sync.Mutex - myPersonalTable map[harmonytask.TaskID]int // This would typicallyb be a DB table + myPersonalTable map[harmonytask.TaskID]int // This would typically be a DB table WorkCompleted []string } diff --git a/lib/harmony/harmonytask/doc.go b/lib/harmony/harmonytask/doc.go index 44fccb644..772b674cd 100644 --- a/lib/harmony/harmonytask/doc.go +++ b/lib/harmony/harmonytask/doc.go @@ -23,8 +23,9 @@ Mental Model: - Async Listener task (for chain, etc) - Followers: Tasks get added because another task completed When Follower collectors run: - - If both sides are process-local, then - - Otherwise, at the listen interval during db scrape + - If both sides are process-local, then this process will pick it up. + - If properly registered already, the http endpoint will be tried to start it. + - Otherwise, at the listen interval during db scrape it will be found. How duplicate tasks are avoided: - that's up to the task definition, but probably a unique key diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go index 8e5c89e26..cd401f6d2 100644 --- a/lib/harmony/harmonytask/harmonytask.go +++ b/lib/harmony/harmonytask/harmonytask.go @@ -56,7 +56,7 @@ type TaskInterface interface { // CanAccept should return if the task can run on this machine. It should // return null if the task type is not allowed on this machine. // It should select the task it most wants to accomplish. - // It is also responsible for determining disk space (including scratch). + // It is also responsible for determining & reserving disk space (including scratch). CanAccept([]TaskID) (*TaskID, error) // TypeDetails() returns static details about how this task behaves and @@ -181,7 +181,7 @@ func New( if h == nil { _, err := db.Exec(e.ctx, `UPDATE harmony_task SET owner=NULL WHERE id=$1`, w.ID) if err != nil { - log.Error("Cannot remove self from owner field: ", err) + log.Errorw("Cannot remove self from owner field", "error", err) continue // not really fatal, but not great } } @@ -206,12 +206,14 @@ func (e *TaskEngine) GracefullyTerminate(deadline time.Duration) { e.reg.Shutdown() deadlineChan := time.NewTimer(deadline).C + ctx := context.TODO() + // block bumps & follows by unreg from DBs. - _, err := e.db.Exec(context.TODO(), `DELETE FROM harmony_task_impl WHERE owner_id=$1`, e.ownerID) + _, err := e.db.Exec(ctx, `DELETE FROM harmony_task_impl WHERE owner_id=$1`, e.ownerID) if err != nil { log.Warn("Could not clean-up impl table: %w", err) } - _, err = e.db.Exec(context.Background(), `DELETE FROM harmony_task_follow WHERE owner_id=$1`, e.ownerID) + _, err = e.db.Exec(ctx, `DELETE FROM harmony_task_follow WHERE owner_id=$1`, e.ownerID) if err != nil { log.Warn("Could not clean-up impl table: %w", err) } @@ -271,7 +273,7 @@ func (e *TaskEngine) followWorkInDB() { // we need to create this task if !src.h.Follows[fromName](TaskID(workAlreadyDone), src.h.AddTask) { // But someone may have beaten us to it. - log.Infof("Unable to add task %s following Task(%d, %s)", src.h.Name, workAlreadyDone, fromName) + log.Debugf("Unable to add task %s following Task(%d, %s)", src.h.Name, workAlreadyDone, fromName) } } } @@ -386,11 +388,21 @@ func (e *TaskEngine) bump(taskType string) { // resourcesInUse requires workListsMutex to be already locked. func (e *TaskEngine) resourcesInUse() resources.Resources { tmp := e.reg.Resources + copy(tmp.GpuRam, e.reg.Resources.GpuRam) for _, t := range e.handlers { ct := t.Count.Load() tmp.Cpu -= int(ct) * t.Cost.Cpu tmp.Gpu -= float64(ct) * t.Cost.Gpu tmp.Ram -= uint64(ct) * t.Cost.Ram + for i := int32(0); i < ct; i++ { + for grIdx, j := range tmp.GpuRam { + if j > t.Cost.GpuRam[0] { + tmp.GpuRam[grIdx] = j - t.Cost.GpuRam[0] + break + } + } + log.Warn("We should never get out of gpuram for what's consumed.") + } } return tmp } diff --git a/lib/harmony/harmonytask/taskTypeHandler.go b/lib/harmony/harmonytask/task_type_handler.go similarity index 97% rename from lib/harmony/harmonytask/taskTypeHandler.go rename to lib/harmony/harmonytask/task_type_handler.go index 7d11a957f..932cfc297 100644 --- a/lib/harmony/harmonytask/taskTypeHandler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -223,6 +223,13 @@ func (h *taskTypeHandler) AssertMachineHasCapacity() error { if r.Gpu-h.Cost.Gpu < 0 { return errors.New("Did not accept " + h.Name + " task: out of available GPU") } + for _, u := range r.GpuRam { + if u > h.Cost.GpuRam[0] { + goto enoughGpuRam + } + } + return errors.New("Did not accept " + h.Name + " task: out of GPURam") +enoughGpuRam: return nil } diff --git a/lib/harmony/resources/miniopencl/miniopencl.go b/lib/harmony/resources/miniopencl/mini_opencl.go similarity index 94% rename from lib/harmony/resources/miniopencl/miniopencl.go rename to lib/harmony/resources/miniopencl/mini_opencl.go index 6b07e1cba..a6bac9582 100644 --- a/lib/harmony/resources/miniopencl/miniopencl.go +++ b/lib/harmony/resources/miniopencl/mini_opencl.go @@ -1,3 +1,5 @@ +// Package cl was borrowed from the go-opencl library which is more complex and +// doesn't compile well for our needs. package cl // #include "cl.h" diff --git a/lib/harmony/resources/resources.go b/lib/harmony/resources/resources.go index 94df047d8..115859d75 100644 --- a/lib/harmony/resources/resources.go +++ b/lib/harmony/resources/resources.go @@ -27,7 +27,7 @@ var LOOKS_DEAD_TIMEOUT = 10 * time.Minute // Time w/o minute heartbeats type Resources struct { Cpu int Gpu float64 - GpuRam uint64 + GpuRam []uint64 Ram uint64 MachineID int } @@ -72,7 +72,8 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) { return nil, err } } - CleanupMachines(context.Background(), db) + cleaned := CleanupMachines(context.Background(), db) + logger.Infow("Cleaned up machines", "count", cleaned) } go func() { for { @@ -138,21 +139,24 @@ func getResources() (res Resources, err error) { return res, nil } -func getGpuRam() uint64 { +func getGpuRam() (res []uint64) { platforms, err := cl.GetPlatforms() if err != nil { logger.Error(err) - return 0 + return res } - return uint64(lo.SumBy(platforms, func(p *cl.Platform) int64 { + lo.ForEach(platforms, func(p *cl.Platform, i int) { d, err := p.GetAllDevices() if err != nil { logger.Error(err) - return 0 + return } - return lo.SumBy(d, func(d *cl.Device) int64 { return d.GlobalMemSize() }) - })) + lo.ForEach(d, func(d *cl.Device, i int) { + res = append(res, uint64(d.GlobalMemSize())) + }) + }) + return res } func DiskFree(path string) (uint64, error) { @@ -164,17 +168,3 @@ func DiskFree(path string) (uint64, error) { return s.Bfree * uint64(s.Bsize), nil } - -/* NOT for Darwin. -func GetMemFree() uint64 { - in := unix.Sysinfo_t{} - err := unix.Sysinfo(&in) - if err != nil { - return 0 - } - // If this is a 32-bit system, then these fields are - // uint32 instead of uint64. - // So we always convert to uint64 to match signature. - return uint64(in.Freeram) * uint64(in.Unit) -} -*/