harmonytask fixes
This commit is contained in:
parent
f2a90aecef
commit
f34540f5ef
@ -60,12 +60,12 @@ func (t *task1) TypeDetails() harmonytask.TaskTypeDetails {
|
||||
func (t *task1) Adder(add harmonytask.AddTaskFunc) {
|
||||
for _, vTmp := range t.toAdd {
|
||||
v := vTmp
|
||||
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool {
|
||||
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
|
||||
t.myPersonalTableLock.Lock()
|
||||
defer t.myPersonalTableLock.Unlock()
|
||||
|
||||
t.myPersonalTable[tID] = v
|
||||
return true
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -120,10 +120,10 @@ func fooLetterAdder(t *testing.T, cdb *harmonydb.DB) *passthru {
|
||||
adder: func(add harmonytask.AddTaskFunc) {
|
||||
for _, vTmp := range []string{"A", "B"} {
|
||||
v := vTmp
|
||||
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool {
|
||||
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
|
||||
_, err := tx.Exec("INSERT INTO itest_scratch (some_int, content) VALUES ($1,$2)", tID, v)
|
||||
require.NoError(t, err)
|
||||
return true
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
},
|
||||
|
@ -3,13 +3,12 @@ package harmonytask
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/resources"
|
||||
@ -40,7 +39,7 @@ type TaskTypeDetails struct {
|
||||
// It should also return success if the trigger succeeded.
|
||||
// NOTE: if refatoring tasks, see if your task is
|
||||
// necessary. Ex: Is the sector state correct for your stage to run?
|
||||
Follows map[string]func(TaskID, AddTaskFunc) bool
|
||||
Follows map[string]func(TaskID, AddTaskFunc) (bool, error)
|
||||
}
|
||||
|
||||
// TaskInterface must be implemented in order to have a task used by harmonytask.
|
||||
@ -90,7 +89,7 @@ type TaskInterface interface {
|
||||
Adder(AddTaskFunc)
|
||||
}
|
||||
|
||||
type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) bool)
|
||||
type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) (bool, error))
|
||||
|
||||
type TaskEngine struct {
|
||||
ctx context.Context
|
||||
@ -107,8 +106,9 @@ type TaskEngine struct {
|
||||
lastCleanup atomic.Value
|
||||
}
|
||||
type followStruct struct {
|
||||
f func(TaskID, AddTaskFunc) bool
|
||||
h *taskTypeHandler
|
||||
f func(TaskID, AddTaskFunc) (bool, error)
|
||||
h *taskTypeHandler
|
||||
name string
|
||||
}
|
||||
|
||||
type TaskID int
|
||||
@ -153,7 +153,7 @@ func New(
|
||||
}
|
||||
|
||||
for name, fn := range c.TypeDetails().Follows {
|
||||
e.follows[name] = append(e.follows[name], followStruct{fn, &h})
|
||||
e.follows[name] = append(e.follows[name], followStruct{fn, &h, name})
|
||||
|
||||
// populate harmony_task_follows
|
||||
_, err := db.Exec(e.ctx, `INSERT INTO harmony_task_follows (owner_id, from_task, to_task)
|
||||
@ -271,7 +271,12 @@ func (e *TaskEngine) followWorkInDB() {
|
||||
continue
|
||||
}
|
||||
// we need to create this task
|
||||
if !src.h.Follows[fromName](TaskID(workAlreadyDone), src.h.AddTask) {
|
||||
b, err := src.h.Follows[fromName](TaskID(workAlreadyDone), src.h.AddTask)
|
||||
if err != nil {
|
||||
log.Errorw("Could not follow: ", "error", err)
|
||||
continue
|
||||
}
|
||||
if !b {
|
||||
// But someone may have beaten us to it.
|
||||
log.Debugf("Unable to add task %s following Task(%d, %s)", src.h.Name, workAlreadyDone, fromName)
|
||||
}
|
||||
@ -314,54 +319,54 @@ func (e *TaskEngine) pollerTryAllWork() {
|
||||
|
||||
// GetHttpHandlers needs to be used by the http server to register routes.
|
||||
// This implements the receiver-side of "follows" and "bumps" the fast way.
|
||||
func (e *TaskEngine) GetHttpHandlers() http.Handler {
|
||||
root := mux.NewRouter()
|
||||
s := root.PathPrefix("/scheduler")
|
||||
f := s.PathPrefix("/follows")
|
||||
b := s.PathPrefix("/bump")
|
||||
func (e *TaskEngine) ApplyHttpHandlers(root gin.IRouter) {
|
||||
s := root.Group("/scheduler")
|
||||
f := s.Group("/follows")
|
||||
b := s.Group("/bump")
|
||||
for name, vsTmp := range e.follows {
|
||||
vs := vsTmp
|
||||
f.Path("/" + name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
tIDString := mux.Vars(r)["tID"]
|
||||
f.GET("/"+name+"/:tID", func(c *gin.Context) {
|
||||
tIDString := c.Param("tID")
|
||||
tID, err := strconv.Atoi(tIDString)
|
||||
if err != nil {
|
||||
w.WriteHeader(401)
|
||||
fmt.Fprint(w, err.Error())
|
||||
c.JSON(401, map[string]any{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
taskAdded := false
|
||||
for _, vTmp := range vs {
|
||||
v := vTmp
|
||||
taskAdded = taskAdded || v.f(TaskID(tID), v.h.AddTask)
|
||||
b, err := v.f(TaskID(tID), v.h.AddTask)
|
||||
if err != nil {
|
||||
log.Error("Follow attemp failed", "error", err, "from", name, "to", v.name)
|
||||
}
|
||||
taskAdded = taskAdded || b
|
||||
}
|
||||
if taskAdded {
|
||||
e.tryAllWork <- true
|
||||
w.WriteHeader(200)
|
||||
c.Status(200)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked.
|
||||
c.Status(202) // NOTE: 202 for "accepted" but not worked.
|
||||
})
|
||||
}
|
||||
for _, hTmp := range e.handlers {
|
||||
h := hTmp
|
||||
b.Path("/" + h.Name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
tIDString := mux.Vars(r)["tID"]
|
||||
b.GET("/"+h.Name+"/:tID", func(c *gin.Context) {
|
||||
tIDString := c.Param("tID")
|
||||
tID, err := strconv.Atoi(tIDString)
|
||||
if err != nil {
|
||||
w.WriteHeader(401)
|
||||
fmt.Fprint(w, err.Error())
|
||||
c.JSON(401, map[string]any{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
// We NEED to block while trying to deliver
|
||||
// this work to ease the network impact.
|
||||
if h.considerWork("bump", []TaskID{TaskID(tID)}) {
|
||||
w.WriteHeader(200)
|
||||
c.Status(200)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked.
|
||||
c.Status(202) // NOTE: 202 for "accepted" but not worked.
|
||||
})
|
||||
}
|
||||
return root
|
||||
}
|
||||
|
||||
func (e *TaskEngine) bump(taskType string) {
|
||||
|
@ -3,6 +3,7 @@ package harmonytask
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
@ -23,15 +24,14 @@ type taskTypeHandler struct {
|
||||
Count atomic.Int32
|
||||
}
|
||||
|
||||
func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) bool) {
|
||||
func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) {
|
||||
var tID TaskID
|
||||
did, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) bool {
|
||||
did, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
|
||||
// create taskID (from DB)
|
||||
_, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time)
|
||||
VALUES ($1, $2, CURRENT_TIMESTAMP) `, h.Name, h.TaskEngine.ownerID)
|
||||
if err != nil {
|
||||
log.Error("Could not insert into harmonyTask", err)
|
||||
return false
|
||||
return false, fmt.Errorf("Could not insert into harmonyTask: %w", err)
|
||||
}
|
||||
err = tx.QueryRow("SELECT id FROM harmony_task ORDER BY update_time DESC LIMIT 1").Scan(&tID)
|
||||
if err != nil {
|
||||
@ -146,19 +146,18 @@ top:
|
||||
func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) {
|
||||
workEnd := time.Now()
|
||||
|
||||
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) bool {
|
||||
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
|
||||
var postedTime time.Time
|
||||
err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime)
|
||||
if err != nil {
|
||||
log.Error("Could not log completion: ", err)
|
||||
return false
|
||||
return false, fmt.Errorf("Could not log completion: %w ", err)
|
||||
}
|
||||
result := "unspecified error"
|
||||
if done {
|
||||
_, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
|
||||
if err != nil {
|
||||
log.Error("Could not log completion: ", err)
|
||||
return false
|
||||
|
||||
return false, fmt.Errorf("Could not log completion: %w", err)
|
||||
}
|
||||
result = ""
|
||||
} else {
|
||||
@ -171,8 +170,7 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done
|
||||
err = tx.QueryRow(`SELECT count(*) FROM harmony_task_history
|
||||
WHERE task_id=$1 AND result=FALSE`, tID).Scan(&ct)
|
||||
if err != nil {
|
||||
log.Error("Could not read task history:", err)
|
||||
return false
|
||||
return false, fmt.Errorf("Could not read task history: %w", err)
|
||||
}
|
||||
if ct >= h.MaxFailures {
|
||||
deleteTask = true
|
||||
@ -181,15 +179,13 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done
|
||||
if deleteTask {
|
||||
_, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
|
||||
if err != nil {
|
||||
log.Error("Could not delete failed job: ", err)
|
||||
return false
|
||||
return false, fmt.Errorf("Could not delete failed job: %w", err)
|
||||
}
|
||||
// Note: Extra Info is left laying around for later review & clean-up
|
||||
} else {
|
||||
_, err := tx.Exec(`UPDATE harmony_task SET owner_id=NULL WHERE id=$1`, tID)
|
||||
if err != nil {
|
||||
log.Error("Could not disown failed task: ", tID, err)
|
||||
return false
|
||||
return false, fmt.Errorf("Could not disown failed task: %v %v", tID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -197,10 +193,9 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done
|
||||
(task_id, name, posted, work_start, work_end, result, err)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)`, tID, h.Name, postedTime, workStart, workEnd, done, result)
|
||||
if err != nil {
|
||||
log.Error("Could not write history: ", err)
|
||||
return false
|
||||
return false, fmt.Errorf("Could not write history: %w", err)
|
||||
}
|
||||
return true
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("Could not record transaction: ", err)
|
||||
@ -249,7 +244,11 @@ func (h *taskTypeHandler) triggerCompletionListeners(tID TaskID) {
|
||||
inProcessDefs := h.TaskEngine.follows[h.Name]
|
||||
inProcessFollowers := make([]string, len(inProcessDefs))
|
||||
for _, fs := range inProcessDefs {
|
||||
if fs.f(tID, fs.h.AddTask) {
|
||||
b, err := fs.f(tID, fs.h.AddTask)
|
||||
if err != nil {
|
||||
log.Error("Could not follow", "error", err, "from", h.Name, "to", fs.name)
|
||||
}
|
||||
if b {
|
||||
inProcessFollowers = append(inProcessFollowers, fs.h.Name)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user