harmonytask: better messages

This commit is contained in:
Andrew Jackson (Ajax) 2023-08-16 16:56:09 -05:00
parent dfb029cb30
commit 497e4e5ab5
5 changed files with 60 additions and 69 deletions

View File

@ -11,23 +11,23 @@ machines to accept work (round robin) before trying again to accept.
*
Mental Model:
Things that block tasks:
- task not registered for any running server
- max was specified and reached
- resource exhaustion
- CanAccept() interface (per-task implmentation) does not accept it.
Ways tasks start: (slowest first)
- DB Read every 1 minute
- Bump via HTTP if registered in DB
- Task was added (to db) by this process
Ways tasks get added:
- Async Listener task (for chain, etc)
- Followers: Tasks get added because another task completed
When Follower collectors run:
- If both sides are process-local, then
- Otherwise, at the listen interval during db scrape
How duplicate tasks are avoided:
- that's up to the task definition, but probably a unique key
Things that block tasks:
- task not registered for any running server
- max was specified and reached
- resource exhaustion
- CanAccept() interface (per-task implmentation) does not accept it.
Ways tasks start: (slowest first)
- DB Read every 1 minute
- Bump via HTTP if registered in DB
- Task was added (to db) by this process
Ways tasks get added:
- Async Listener task (for chain, etc)
- Followers: Tasks get added because another task completed
When Follower collectors run:
- If both sides are process-local, then
- Otherwise, at the listen interval during db scrape
How duplicate tasks are avoided:
- that's up to the task definition, but probably a unique key
*
To use:

View File

@ -3,12 +3,14 @@ package harmonytask
import (
"context"
"fmt"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/gin-gonic/gin"
"github.com/gorilla/mux"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
)
@ -94,7 +96,7 @@ type TaskEngine struct {
ctx context.Context
handlers []*taskTypeHandler
db *harmonydb.DB
workAdderMutex *notifyingMx
workAdderMutex sync.Mutex
reg *resources.Reg
grace context.CancelFunc
taskMap map[string]*taskTypeHandler
@ -125,15 +127,14 @@ func New(
}
ctx, grace := context.WithCancel(context.Background())
e := &TaskEngine{
ctx: ctx,
grace: grace,
db: db,
reg: reg,
ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort"
workAdderMutex: &notifyingMx{},
taskMap: make(map[string]*taskTypeHandler, len(impls)),
tryAllWork: make(chan bool),
follows: make(map[string][]followStruct),
ctx: ctx,
grace: grace,
db: db,
reg: reg,
ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort"
taskMap: make(map[string]*taskTypeHandler, len(impls)),
tryAllWork: make(chan bool),
follows: make(map[string][]followStruct),
}
e.lastCleanup.Store(time.Now())
for _, c := range impls {
@ -184,7 +185,7 @@ func New(
continue // not really fatal, but not great
}
}
if !h.considerWork([]TaskID{TaskID(w.ID)}) {
if !h.considerWork("recovered", []TaskID{TaskID(w.ID)}) {
log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name)
}
}
@ -297,7 +298,7 @@ func (e *TaskEngine) pollerTryAllWork() {
log.Error("Unable to read work ", err)
continue
}
accepted := v.considerWork(unownedTasks)
accepted := v.considerWork("poller", unownedTasks)
if !accepted {
log.Warn("Work not accepted")
continue
@ -309,17 +310,20 @@ func (e *TaskEngine) pollerTryAllWork() {
}
}
// AddHttpHandlers TODO this needs to be called by the http server to register routes.
// GetHttpHandlers needs to be used by the http server to register routes.
// This implements the receiver-side of "follows" and "bumps" the fast way.
func (e *TaskEngine) AddHttpHandlers(root gin.IRouter) {
s := root.Group("/scheduler/")
f := s.Group("/follows")
func (e *TaskEngine) GetHttpHandlers() http.Handler {
root := mux.NewRouter()
s := root.PathPrefix("/scheduler")
f := s.PathPrefix("/follows")
b := s.PathPrefix("/bump")
for name, v := range e.follows {
f.GET("/"+name+"/:tID", func(c *gin.Context) {
tIDString := c.Param("tID")
f.Path("/" + name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tIDString := mux.Vars(r)["tID"]
tID, err := strconv.Atoi(tIDString)
if err != nil {
c.AbortWithError(401, err)
w.WriteHeader(401)
fmt.Fprint(w, err.Error())
return
}
taskAdded := false
@ -328,28 +332,31 @@ func (e *TaskEngine) AddHttpHandlers(root gin.IRouter) {
}
if taskAdded {
e.tryAllWork <- true
c.Status(200)
w.WriteHeader(200)
return
}
c.Status(202) // NOTE: 202 for "accepted" but not worked.
w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked.
})
}
b := s.Group("/bump")
for _, h := range e.handlers {
b.GET("/"+h.Name+"/:tID", func(c *gin.Context) {
tIDString := c.Param("tID")
b.Path("/" + h.Name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tIDString := mux.Vars(r)["tID"]
tID, err := strconv.Atoi(tIDString)
if err != nil {
c.AbortWithError(401, err)
w.WriteHeader(401)
fmt.Fprint(w, err.Error())
return
}
// We NEED to block while trying to deliver
// this work to ease the network impact.
if h.considerWork([]TaskID{TaskID(tID)}) {
c.Status(200)
if h.considerWork("bump", []TaskID{TaskID(tID)}) {
w.WriteHeader(200)
return
}
c.Status(202) // NOTE: 202 for "accepted" but not worked.
w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked.
})
}
return root
}
func (e *TaskEngine) bump(taskType string) {

View File

@ -1,16 +0,0 @@
package harmonytask
import "sync"
type notifyingMx struct {
sync.Mutex
UnlockNotify func()
}
func (n *notifyingMx) Unlock() {
tmp := n.UnlockNotify
n.Mutex.Unlock()
if tmp != nil {
tmp()
}
}

View File

@ -19,8 +19,7 @@ type taskTypeHandler struct {
TaskInterface
TaskTypeDetails
TaskEngine *TaskEngine
Count atomic.Int32 /// locked by TaskEngine's mutex
Count atomic.Int32
}
func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) bool) {
@ -46,12 +45,12 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) bool) {
return
}
if !h.considerWork([]TaskID{tID}) {
if !h.considerWork("adder", []TaskID{tID}) {
h.TaskEngine.bump(h.Name) // We can't do it. How about someone else.
}
}
func (h *taskTypeHandler) considerWork(ids []TaskID) (workAccepted bool) {
func (h *taskTypeHandler) considerWork(from string, ids []TaskID) (workAccepted bool) {
top:
if len(ids) == 0 {
return true // stop looking for takers
@ -104,6 +103,7 @@ top:
go func() {
h.Count.Add(1)
log.Infow("Beginning work on Task", "id", *tID, "from", from, "type", h.Name)
var done bool
var doErr error
@ -136,7 +136,7 @@ top:
return owner == h.TaskEngine.ownerID
})
if doErr != nil {
log.Error("Do("+h.Name+", taskID="+strconv.Itoa(int(*tID))+") returned error: ", doErr)
log.Errorw("Do() returned error", "type", h.Name, "id", strconv.Itoa(int(*tID)), "error", doErr)
}
}()
return true

View File

@ -38,7 +38,7 @@ type Reg struct {
var logger = logging.Logger("harmonytask")
var lotusRE = regexp.MustCompile("lotus-worker|lotus-harmony|yugabyted")
var lotusRE = regexp.MustCompile("lotus-worker|lotus-harmony|yugabyted|yb-master|yb-tserver")
func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
var reg Reg
@ -114,7 +114,7 @@ func getResources() (res Resources, err error) {
}
}
if found > 1 {
logger.Error("This Lotus process should run alone on a machine. Use CGroup.")
logger.Warn("lotus-provider's defaults are for running alone. Use task maximums or CGroups.")
}
}