diff --git a/itests/harmonytask_test.go b/itests/harmonytask_test.go index b36e9ab11..4985b8421 100644 --- a/itests/harmonytask_test.go +++ b/itests/harmonytask_test.go @@ -60,12 +60,12 @@ func (t *task1) TypeDetails() harmonytask.TaskTypeDetails { func (t *task1) Adder(add harmonytask.AddTaskFunc) { for _, vTmp := range t.toAdd { v := vTmp - add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool { + add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { t.myPersonalTableLock.Lock() defer t.myPersonalTableLock.Unlock() t.myPersonalTable[tID] = v - return true + return true, nil }) } } @@ -120,10 +120,10 @@ func fooLetterAdder(t *testing.T, cdb *harmonydb.DB) *passthru { adder: func(add harmonytask.AddTaskFunc) { for _, vTmp := range []string{"A", "B"} { v := vTmp - add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool { + add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { _, err := tx.Exec("INSERT INTO itest_scratch (some_int, content) VALUES ($1,$2)", tID, v) require.NoError(t, err) - return true + return true, nil }) } }, diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go index cd401f6d2..3296c9f17 100644 --- a/lib/harmony/harmonytask/harmonytask.go +++ b/lib/harmony/harmonytask/harmonytask.go @@ -3,13 +3,12 @@ package harmonytask import ( "context" "fmt" - "net/http" "strconv" "sync" "sync/atomic" "time" - "github.com/gorilla/mux" + "github.com/gin-gonic/gin" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/resources" @@ -40,7 +39,7 @@ type TaskTypeDetails struct { // It should also return success if the trigger succeeded. // NOTE: if refatoring tasks, see if your task is // necessary. Ex: Is the sector state correct for your stage to run? - Follows map[string]func(TaskID, AddTaskFunc) bool + Follows map[string]func(TaskID, AddTaskFunc) (bool, error) } // TaskInterface must be implemented in order to have a task used by harmonytask. @@ -90,7 +89,7 @@ type TaskInterface interface { Adder(AddTaskFunc) } -type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) bool) +type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) (bool, error)) type TaskEngine struct { ctx context.Context @@ -107,8 +106,9 @@ type TaskEngine struct { lastCleanup atomic.Value } type followStruct struct { - f func(TaskID, AddTaskFunc) bool - h *taskTypeHandler + f func(TaskID, AddTaskFunc) (bool, error) + h *taskTypeHandler + name string } type TaskID int @@ -153,7 +153,7 @@ func New( } for name, fn := range c.TypeDetails().Follows { - e.follows[name] = append(e.follows[name], followStruct{fn, &h}) + e.follows[name] = append(e.follows[name], followStruct{fn, &h, name}) // populate harmony_task_follows _, err := db.Exec(e.ctx, `INSERT INTO harmony_task_follows (owner_id, from_task, to_task) @@ -271,7 +271,12 @@ func (e *TaskEngine) followWorkInDB() { continue } // we need to create this task - if !src.h.Follows[fromName](TaskID(workAlreadyDone), src.h.AddTask) { + b, err := src.h.Follows[fromName](TaskID(workAlreadyDone), src.h.AddTask) + if err != nil { + log.Errorw("Could not follow: ", "error", err) + continue + } + if !b { // But someone may have beaten us to it. log.Debugf("Unable to add task %s following Task(%d, %s)", src.h.Name, workAlreadyDone, fromName) } @@ -314,54 +319,54 @@ func (e *TaskEngine) pollerTryAllWork() { // GetHttpHandlers needs to be used by the http server to register routes. // This implements the receiver-side of "follows" and "bumps" the fast way. -func (e *TaskEngine) GetHttpHandlers() http.Handler { - root := mux.NewRouter() - s := root.PathPrefix("/scheduler") - f := s.PathPrefix("/follows") - b := s.PathPrefix("/bump") +func (e *TaskEngine) ApplyHttpHandlers(root gin.IRouter) { + s := root.Group("/scheduler") + f := s.Group("/follows") + b := s.Group("/bump") for name, vsTmp := range e.follows { vs := vsTmp - f.Path("/" + name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - tIDString := mux.Vars(r)["tID"] + f.GET("/"+name+"/:tID", func(c *gin.Context) { + tIDString := c.Param("tID") tID, err := strconv.Atoi(tIDString) if err != nil { - w.WriteHeader(401) - fmt.Fprint(w, err.Error()) + c.JSON(401, map[string]any{"error": err.Error()}) return } taskAdded := false for _, vTmp := range vs { v := vTmp - taskAdded = taskAdded || v.f(TaskID(tID), v.h.AddTask) + b, err := v.f(TaskID(tID), v.h.AddTask) + if err != nil { + log.Error("Follow attemp failed", "error", err, "from", name, "to", v.name) + } + taskAdded = taskAdded || b } if taskAdded { e.tryAllWork <- true - w.WriteHeader(200) + c.Status(200) return } - w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked. + c.Status(202) // NOTE: 202 for "accepted" but not worked. }) } for _, hTmp := range e.handlers { h := hTmp - b.Path("/" + h.Name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - tIDString := mux.Vars(r)["tID"] + b.GET("/"+h.Name+"/:tID", func(c *gin.Context) { + tIDString := c.Param("tID") tID, err := strconv.Atoi(tIDString) if err != nil { - w.WriteHeader(401) - fmt.Fprint(w, err.Error()) + c.JSON(401, map[string]any{"error": err.Error()}) return } // We NEED to block while trying to deliver // this work to ease the network impact. if h.considerWork("bump", []TaskID{TaskID(tID)}) { - w.WriteHeader(200) + c.Status(200) return } - w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked. + c.Status(202) // NOTE: 202 for "accepted" but not worked. }) } - return root } func (e *TaskEngine) bump(taskType string) { diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 932cfc297..af47b498f 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -3,6 +3,7 @@ package harmonytask import ( "context" "errors" + "fmt" "io" "net/http" "strconv" @@ -23,15 +24,14 @@ type taskTypeHandler struct { Count atomic.Int32 } -func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) bool) { +func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) { var tID TaskID - did, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) bool { + did, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { // create taskID (from DB) _, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time) VALUES ($1, $2, CURRENT_TIMESTAMP) `, h.Name, h.TaskEngine.ownerID) if err != nil { - log.Error("Could not insert into harmonyTask", err) - return false + return false, fmt.Errorf("Could not insert into harmonyTask: %w", err) } err = tx.QueryRow("SELECT id FROM harmony_task ORDER BY update_time DESC LIMIT 1").Scan(&tID) if err != nil { @@ -146,19 +146,18 @@ top: func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) { workEnd := time.Now() - cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) bool { + cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { var postedTime time.Time err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime) if err != nil { - log.Error("Could not log completion: ", err) - return false + return false, fmt.Errorf("Could not log completion: %w ", err) } result := "unspecified error" if done { _, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) if err != nil { - log.Error("Could not log completion: ", err) - return false + + return false, fmt.Errorf("Could not log completion: %w", err) } result = "" } else { @@ -171,8 +170,7 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done err = tx.QueryRow(`SELECT count(*) FROM harmony_task_history WHERE task_id=$1 AND result=FALSE`, tID).Scan(&ct) if err != nil { - log.Error("Could not read task history:", err) - return false + return false, fmt.Errorf("Could not read task history: %w", err) } if ct >= h.MaxFailures { deleteTask = true @@ -181,15 +179,13 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done if deleteTask { _, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) if err != nil { - log.Error("Could not delete failed job: ", err) - return false + return false, fmt.Errorf("Could not delete failed job: %w", err) } // Note: Extra Info is left laying around for later review & clean-up } else { _, err := tx.Exec(`UPDATE harmony_task SET owner_id=NULL WHERE id=$1`, tID) if err != nil { - log.Error("Could not disown failed task: ", tID, err) - return false + return false, fmt.Errorf("Could not disown failed task: %v %v", tID, err) } } } @@ -197,10 +193,9 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done (task_id, name, posted, work_start, work_end, result, err) VALUES ($1, $2, $3, $4, $5, $6, $7)`, tID, h.Name, postedTime, workStart, workEnd, done, result) if err != nil { - log.Error("Could not write history: ", err) - return false + return false, fmt.Errorf("Could not write history: %w", err) } - return true + return true, nil }) if err != nil { log.Error("Could not record transaction: ", err) @@ -249,7 +244,11 @@ func (h *taskTypeHandler) triggerCompletionListeners(tID TaskID) { inProcessDefs := h.TaskEngine.follows[h.Name] inProcessFollowers := make([]string, len(inProcessDefs)) for _, fs := range inProcessDefs { - if fs.f(tID, fs.h.AddTask) { + b, err := fs.f(tID, fs.h.AddTask) + if err != nil { + log.Error("Could not follow", "error", err, "from", h.Name, "to", fs.name) + } + if b { inProcessFollowers = append(inProcessFollowers, fs.h.Name) } }