diff --git a/lib/harmony/harmonytask/doc.go b/lib/harmony/harmonytask/doc.go index 357c3e15c..07641976a 100644 --- a/lib/harmony/harmonytask/doc.go +++ b/lib/harmony/harmonytask/doc.go @@ -11,23 +11,23 @@ machines to accept work (round robin) before trying again to accept. * Mental Model: - Things that block tasks: - - task not registered for any running server - - max was specified and reached - - resource exhaustion - - CanAccept() interface (per-task implmentation) does not accept it. - Ways tasks start: (slowest first) - - DB Read every 1 minute - - Bump via HTTP if registered in DB - - Task was added (to db) by this process - Ways tasks get added: - - Async Listener task (for chain, etc) - - Followers: Tasks get added because another task completed - When Follower collectors run: - - If both sides are process-local, then - - Otherwise, at the listen interval during db scrape - How duplicate tasks are avoided: - - that's up to the task definition, but probably a unique key + Things that block tasks: + - task not registered for any running server + - max was specified and reached + - resource exhaustion + - CanAccept() interface (per-task implmentation) does not accept it. + Ways tasks start: (slowest first) + - DB Read every 1 minute + - Bump via HTTP if registered in DB + - Task was added (to db) by this process + Ways tasks get added: + - Async Listener task (for chain, etc) + - Followers: Tasks get added because another task completed + When Follower collectors run: + - If both sides are process-local, then + - Otherwise, at the listen interval during db scrape + How duplicate tasks are avoided: + - that's up to the task definition, but probably a unique key * To use: diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go index 1f5662959..3f1ede4f9 100644 --- a/lib/harmony/harmonytask/harmonytask.go +++ b/lib/harmony/harmonytask/harmonytask.go @@ -3,12 +3,14 @@ package harmonytask import ( "context" "fmt" + "net/http" "strconv" + "sync" "sync/atomic" "time" "github.com/filecoin-project/lotus/lib/harmony/resources" - "github.com/gin-gonic/gin" + "github.com/gorilla/mux" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" ) @@ -94,7 +96,7 @@ type TaskEngine struct { ctx context.Context handlers []*taskTypeHandler db *harmonydb.DB - workAdderMutex *notifyingMx + workAdderMutex sync.Mutex reg *resources.Reg grace context.CancelFunc taskMap map[string]*taskTypeHandler @@ -125,15 +127,14 @@ func New( } ctx, grace := context.WithCancel(context.Background()) e := &TaskEngine{ - ctx: ctx, - grace: grace, - db: db, - reg: reg, - ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort" - workAdderMutex: ¬ifyingMx{}, - taskMap: make(map[string]*taskTypeHandler, len(impls)), - tryAllWork: make(chan bool), - follows: make(map[string][]followStruct), + ctx: ctx, + grace: grace, + db: db, + reg: reg, + ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort" + taskMap: make(map[string]*taskTypeHandler, len(impls)), + tryAllWork: make(chan bool), + follows: make(map[string][]followStruct), } e.lastCleanup.Store(time.Now()) for _, c := range impls { @@ -184,7 +185,7 @@ func New( continue // not really fatal, but not great } } - if !h.considerWork([]TaskID{TaskID(w.ID)}) { + if !h.considerWork("recovered", []TaskID{TaskID(w.ID)}) { log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name) } } @@ -297,7 +298,7 @@ func (e *TaskEngine) pollerTryAllWork() { log.Error("Unable to read work ", err) continue } - accepted := v.considerWork(unownedTasks) + accepted := v.considerWork("poller", unownedTasks) if !accepted { log.Warn("Work not accepted") continue @@ -309,17 +310,20 @@ func (e *TaskEngine) pollerTryAllWork() { } } -// AddHttpHandlers TODO this needs to be called by the http server to register routes. +// GetHttpHandlers needs to be used by the http server to register routes. // This implements the receiver-side of "follows" and "bumps" the fast way. -func (e *TaskEngine) AddHttpHandlers(root gin.IRouter) { - s := root.Group("/scheduler/") - f := s.Group("/follows") +func (e *TaskEngine) GetHttpHandlers() http.Handler { + root := mux.NewRouter() + s := root.PathPrefix("/scheduler") + f := s.PathPrefix("/follows") + b := s.PathPrefix("/bump") for name, v := range e.follows { - f.GET("/"+name+"/:tID", func(c *gin.Context) { - tIDString := c.Param("tID") + f.Path("/" + name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tIDString := mux.Vars(r)["tID"] tID, err := strconv.Atoi(tIDString) if err != nil { - c.AbortWithError(401, err) + w.WriteHeader(401) + fmt.Fprint(w, err.Error()) return } taskAdded := false @@ -328,28 +332,31 @@ func (e *TaskEngine) AddHttpHandlers(root gin.IRouter) { } if taskAdded { e.tryAllWork <- true - c.Status(200) + w.WriteHeader(200) + return } - c.Status(202) // NOTE: 202 for "accepted" but not worked. + w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked. }) } - b := s.Group("/bump") for _, h := range e.handlers { - b.GET("/"+h.Name+"/:tID", func(c *gin.Context) { - tIDString := c.Param("tID") + b.Path("/" + h.Name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tIDString := mux.Vars(r)["tID"] tID, err := strconv.Atoi(tIDString) if err != nil { - c.AbortWithError(401, err) + w.WriteHeader(401) + fmt.Fprint(w, err.Error()) return } // We NEED to block while trying to deliver // this work to ease the network impact. - if h.considerWork([]TaskID{TaskID(tID)}) { - c.Status(200) + if h.considerWork("bump", []TaskID{TaskID(tID)}) { + w.WriteHeader(200) + return } - c.Status(202) // NOTE: 202 for "accepted" but not worked. + w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked. }) } + return root } func (e *TaskEngine) bump(taskType string) { diff --git a/lib/harmony/harmonytask/notifyingMx.go b/lib/harmony/harmonytask/notifyingMx.go deleted file mode 100644 index 51c4e0a53..000000000 --- a/lib/harmony/harmonytask/notifyingMx.go +++ /dev/null @@ -1,16 +0,0 @@ -package harmonytask - -import "sync" - -type notifyingMx struct { - sync.Mutex - UnlockNotify func() -} - -func (n *notifyingMx) Unlock() { - tmp := n.UnlockNotify - n.Mutex.Unlock() - if tmp != nil { - tmp() - } -} diff --git a/lib/harmony/harmonytask/taskTypeHandler.go b/lib/harmony/harmonytask/taskTypeHandler.go index ed2bd4c8a..6693102f2 100644 --- a/lib/harmony/harmonytask/taskTypeHandler.go +++ b/lib/harmony/harmonytask/taskTypeHandler.go @@ -19,8 +19,7 @@ type taskTypeHandler struct { TaskInterface TaskTypeDetails TaskEngine *TaskEngine - Count atomic.Int32 /// locked by TaskEngine's mutex - + Count atomic.Int32 } func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) bool) { @@ -46,12 +45,12 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) bool) { return } - if !h.considerWork([]TaskID{tID}) { + if !h.considerWork("adder", []TaskID{tID}) { h.TaskEngine.bump(h.Name) // We can't do it. How about someone else. } } -func (h *taskTypeHandler) considerWork(ids []TaskID) (workAccepted bool) { +func (h *taskTypeHandler) considerWork(from string, ids []TaskID) (workAccepted bool) { top: if len(ids) == 0 { return true // stop looking for takers @@ -104,6 +103,7 @@ top: go func() { h.Count.Add(1) + log.Infow("Beginning work on Task", "id", *tID, "from", from, "type", h.Name) var done bool var doErr error @@ -136,7 +136,7 @@ top: return owner == h.TaskEngine.ownerID }) if doErr != nil { - log.Error("Do("+h.Name+", taskID="+strconv.Itoa(int(*tID))+") returned error: ", doErr) + log.Errorw("Do() returned error", "type", h.Name, "id", strconv.Itoa(int(*tID)), "error", doErr) } }() return true diff --git a/lib/harmony/resources/resources.go b/lib/harmony/resources/resources.go index 77200b873..8f9d69db5 100644 --- a/lib/harmony/resources/resources.go +++ b/lib/harmony/resources/resources.go @@ -38,7 +38,7 @@ type Reg struct { var logger = logging.Logger("harmonytask") -var lotusRE = regexp.MustCompile("lotus-worker|lotus-harmony|yugabyted") +var lotusRE = regexp.MustCompile("lotus-worker|lotus-harmony|yugabyted|yb-master|yb-tserver") func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) { var reg Reg @@ -114,7 +114,7 @@ func getResources() (res Resources, err error) { } } if found > 1 { - logger.Error("This Lotus process should run alone on a machine. Use CGroup.") + logger.Warn("lotus-provider's defaults are for running alone. Use task maximums or CGroups.") } }