simpleharmony

This commit is contained in:
Andrew Jackson (Ajax) 2023-10-26 22:10:18 -05:00
parent 7ce7b786be
commit c459c63b1d
5 changed files with 33 additions and 223 deletions

View File

@ -254,7 +254,6 @@ var runCmd = &cli.Command{
handler := gin.New() handler := gin.New()
taskEngine.ApplyHttpHandlers(handler.Group("/"))
defer taskEngine.GracefullyTerminate(time.Hour) defer taskEngine.GracefullyTerminate(time.Hour)
fh := &paths.FetchHandler{Local: localStore, PfHandler: &paths.DefaultPartialFileHandler{}} fh := &paths.FetchHandler{Local: localStore, PfHandler: &paths.DefaultPartialFileHandler{}}

View File

@ -79,13 +79,14 @@ func TestHarmonyTasks(t *testing.T) {
toAdd: []int{56, 73}, toAdd: []int{56, 73},
myPersonalTable: map[harmonytask.TaskID]int{}, myPersonalTable: map[harmonytask.TaskID]int{},
} }
harmonytask.POLL_DURATION = time.Millisecond * 100
e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1") e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1")
require.NoError(t, err) require.NoError(t, err)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE.
e.GracefullyTerminate(time.Minute) e.GracefullyTerminate(time.Minute)
expected := []string{"taskResult56", "taskResult73"} expected := []string{"taskResult56", "taskResult73"}
sort.Strings(t1.WorkCompleted) sort.Strings(t1.WorkCompleted)
require.Equal(t, t1.WorkCompleted, expected, "unexpected results") require.Equal(t, expected, t1.WorkCompleted, "unexpected results")
}) })
} }

View File

@ -15,9 +15,8 @@ Mental Model:
- max was specified and reached - max was specified and reached
- resource exhaustion - resource exhaustion
- CanAccept() interface (per-task implmentation) does not accept it. - CanAccept() interface (per-task implmentation) does not accept it.
Ways tasks start: (slowest first) Ways tasks start:
- DB Read every 1 minute - DB Read every 3 seconds
- Bump via HTTP if registered in DB
- Task was added (to db) by this process - Task was added (to db) by this process
Ways tasks get added: Ways tasks get added:
- Async Listener task (for chain, etc) - Async Listener task (for chain, etc)
@ -68,12 +67,5 @@ harmony_task_machines
anything, but serves as a discovery mechanism. Paths are hostnames + ports anything, but serves as a discovery mechanism. Paths are hostnames + ports
which are presumed to support http, but this assumption is only used by which are presumed to support http, but this assumption is only used by
the task system. the task system.
harmony_task_follow / harmony_task_impl
These tables are used to fast-path notifications to other machines instead
of waiting for polling. _impl helps round-robin work pick-up. _follow helps
discover the machines that are interested in creating tasks following the
task that just completed.
*/ */
package harmonytask package harmonytask

View File

@ -4,20 +4,18 @@ import (
"context" "context"
"fmt" "fmt"
"strconv" "strconv"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/gin-gonic/gin"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/resources" "github.com/filecoin-project/lotus/lib/harmony/resources"
) )
// Consts (except for unit test) // Consts (except for unit test)
var POLL_DURATION = time.Minute // Poll for Work this frequently var POLL_DURATION = time.Second * 3 // Poll for Work this frequently
var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone
var FOLLOW_FREQUENCY = 1 * time.Minute // Check for work to follow this often
type TaskTypeDetails struct { type TaskTypeDetails struct {
// Max returns how many tasks this machine can run of this type. // Max returns how many tasks this machine can run of this type.
@ -107,7 +105,6 @@ type TaskEngine struct {
grace context.CancelFunc grace context.CancelFunc
taskMap map[string]*taskTypeHandler taskMap map[string]*taskTypeHandler
ownerID int ownerID int
tryAllWork chan bool // notify if work completed
follows map[string][]followStruct follows map[string][]followStruct
lastFollowTime time.Time lastFollowTime time.Time
lastCleanup atomic.Value lastCleanup atomic.Value
@ -140,7 +137,6 @@ func New(
reg: reg, reg: reg,
ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort" ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort"
taskMap: make(map[string]*taskTypeHandler, len(impls)), taskMap: make(map[string]*taskTypeHandler, len(impls)),
tryAllWork: make(chan bool),
follows: make(map[string][]followStruct), follows: make(map[string][]followStruct),
} }
e.lastCleanup.Store(time.Now()) e.lastCleanup.Store(time.Now())
@ -152,23 +148,6 @@ func New(
} }
e.handlers = append(e.handlers, &h) e.handlers = append(e.handlers, &h)
e.taskMap[h.TaskTypeDetails.Name] = &h e.taskMap[h.TaskTypeDetails.Name] = &h
_, err := db.Exec(e.ctx, `INSERT INTO harmony_task_impl (owner_id, name)
VALUES ($1,$2)`, e.ownerID, h.Name)
if err != nil {
return nil, fmt.Errorf("can't update impl: %w", err)
}
for name, fn := range c.TypeDetails().Follows {
e.follows[name] = append(e.follows[name], followStruct{fn, &h, name})
// populate harmony_task_follows
_, err := db.Exec(e.ctx, `INSERT INTO harmony_task_follows (owner_id, from_task, to_task)
VALUES ($1,$2,$3)`, e.ownerID, name, h.Name)
if err != nil {
return nil, fmt.Errorf("can't update harmony_task_follows: %w", err)
}
}
} }
// resurrect old work // resurrect old work
@ -212,18 +191,6 @@ func (e *TaskEngine) GracefullyTerminate(deadline time.Duration) {
e.grace() e.grace()
e.reg.Shutdown() e.reg.Shutdown()
deadlineChan := time.NewTimer(deadline).C deadlineChan := time.NewTimer(deadline).C
ctx := context.TODO()
// block bumps & follows by unreg from DBs.
_, err := e.db.Exec(ctx, `DELETE FROM harmony_task_impl WHERE owner_id=$1`, e.ownerID)
if err != nil {
log.Warn("Could not clean-up impl table: %w", err)
}
_, err = e.db.Exec(ctx, `DELETE FROM harmony_task_follow WHERE owner_id=$1`, e.ownerID)
if err != nil {
log.Warn("Could not clean-up impl table: %w", err)
}
top: top:
for _, h := range e.handlers { for _, h := range e.handlers {
if h.Count.Load() > 0 { if h.Count.Load() > 0 {
@ -241,17 +208,18 @@ top:
func (e *TaskEngine) poller() { func (e *TaskEngine) poller() {
for { for {
select { select {
case <-e.tryAllWork: ///////////////////// Find work after some work finished
case <-time.NewTicker(POLL_DURATION).C: // Find work periodically case <-time.NewTicker(POLL_DURATION).C: // Find work periodically
case <-e.ctx.Done(): ///////////////////// Graceful exit case <-e.ctx.Done(): ///////////////////// Graceful exit
return return
} }
e.followWorkInDB() // "Follows" the slow way e.pollerTryAllWork()
e.pollerTryAllWork() // "Bumps" (round robin tasks) the slow way if time.Since(e.lastFollowTime) > FOLLOW_FREQUENCY {
e.followWorkInDB()
}
} }
} }
// followWorkInDB implements "Follows" the slow way // followWorkInDB implements "Follows"
func (e *TaskEngine) followWorkInDB() { func (e *TaskEngine) followWorkInDB() {
// Step 1: What are we following? // Step 1: What are we following?
var lastFollowTime time.Time var lastFollowTime time.Time
@ -292,14 +260,13 @@ func (e *TaskEngine) followWorkInDB() {
} }
} }
// pollerTryAllWork implements "Bumps" (next task) the slow way // pollerTryAllWork starts the next 1 task
func (e *TaskEngine) pollerTryAllWork() { func (e *TaskEngine) pollerTryAllWork() {
if time.Since(e.lastCleanup.Load().(time.Time)) > CLEANUP_FREQUENCY { if time.Since(e.lastCleanup.Load().(time.Time)) > CLEANUP_FREQUENCY {
e.lastCleanup.Store(time.Now()) e.lastCleanup.Store(time.Now())
resources.CleanupMachines(e.ctx, e.db) resources.CleanupMachines(e.ctx, e.db)
} }
for _, v := range e.handlers { for _, v := range e.handlers {
rerun:
if v.AssertMachineHasCapacity() != nil { if v.AssertMachineHasCapacity() != nil {
continue continue
} }
@ -312,90 +279,12 @@ func (e *TaskEngine) pollerTryAllWork() {
log.Error("Unable to read work ", err) log.Error("Unable to read work ", err)
continue continue
} }
if len(unownedTasks) > 0 {
accepted := v.considerWork("poller", unownedTasks) accepted := v.considerWork("poller", unownedTasks)
if !accepted { if accepted {
log.Warn("Work not accepted") return // accept new work slowly and in priority order
continue
} }
if len(unownedTasks) > 1 { log.Warn("Work not accepted for " + strconv.Itoa(len(unownedTasks)) + " " + v.Name + " task(s)")
e.bump(v.Name) // wait for others before trying again to add work.
goto rerun
}
}
}
// GetHttpHandlers needs to be used by the http server to register routes.
// This implements the receiver-side of "follows" and "bumps" the fast way.
func (e *TaskEngine) ApplyHttpHandlers(root gin.IRouter) {
s := root.Group("/scheduler")
f := s.Group("/follows")
b := s.Group("/bump")
for name, vs := range e.follows {
name, vs := name, vs
f.GET("/"+name+"/:tID", func(c *gin.Context) {
tIDString := c.Param("tID")
tID, err := strconv.Atoi(tIDString)
if err != nil {
c.JSON(401, map[string]any{"error": err.Error()})
return
}
taskAdded := false
for _, vTmp := range vs {
v := vTmp
b, err := v.f(TaskID(tID), v.h.AddTask)
if err != nil {
log.Errorw("Follow attempt failed", "error", err, "from", name, "to", v.name)
}
taskAdded = taskAdded || b
}
if taskAdded {
e.tryAllWork <- true
c.Status(200)
return
}
c.Status(202) // NOTE: 202 for "accepted" but not worked.
})
}
for _, hTmp := range e.handlers {
h := hTmp
b.GET("/"+h.Name+"/:tID", func(c *gin.Context) {
tIDString := c.Param("tID")
tID, err := strconv.Atoi(tIDString)
if err != nil {
c.JSON(401, map[string]any{"error": err.Error()})
return
}
// We NEED to block while trying to deliver
// this work to ease the network impact.
if h.considerWork("bump", []TaskID{TaskID(tID)}) {
c.Status(200)
return
}
c.Status(202) // NOTE: 202 for "accepted" but not worked.
})
}
}
func (e *TaskEngine) bump(taskType string) {
var res []string
err := e.db.Select(e.ctx, &res, `SELECT host_and_port FROM harmony_machines m
JOIN harmony_task_impl i ON i.owner_id=m.id
WHERE i.name=$1`, taskType)
if err != nil {
log.Error("Could not read db for bump: ", err)
return
}
for _, url := range res {
if !strings.HasPrefix(strings.ToLower(url), "http") {
url = "http://"
}
resp, err := hClient.Get(url + "/scheduler/bump/" + taskType)
if err != nil {
log.Info("Server unreachable to bump: ", err)
continue
}
if resp.StatusCode == 200 {
return // just want 1 taker.
} }
} }
} }

View File

@ -4,15 +4,12 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"net/http"
"runtime" "runtime"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"time" "time"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/jackc/pgx/v5/pgconn"
"github.com/samber/lo" "github.com/samber/lo"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
@ -29,7 +26,7 @@ type taskTypeHandler struct {
func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) { func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) {
var tID TaskID var tID TaskID
did, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { _, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
// create taskID (from DB) // create taskID (from DB)
_, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time) _, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time)
VALUES ($1, $2, CURRENT_TIMESTAMP) `, h.Name, h.TaskEngine.ownerID) VALUES ($1, $2, CURRENT_TIMESTAMP) `, h.Name, h.TaskEngine.ownerID)
@ -44,21 +41,13 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error
}) })
if err != nil { if err != nil {
var pgErr *pgconn.PgError if harmonydb.IsErrUniqueContraint(err) {
if errors.As(err, &pgErr) && pgErr.ConstraintName != "" { log.Debugf("addtask(%s) saw unique constraint, so it's added already.", h.Name)
log.Debug("addtask saw unique constraint ", pgErr.ConstraintName, ": so it's added already.")
return return
} }
log.Error("Could not add task. AddTasFunc failed: %v", err) log.Error("Could not add task. AddTasFunc failed: %v", err)
return return
} }
if !did {
return
}
if !h.considerWork("adder", []TaskID{tID}) {
h.TaskEngine.bump(h.Name) // We can't do it. How about someone else.
}
} }
func (h *taskTypeHandler) considerWork(from string, ids []TaskID) (workAccepted bool) { func (h *taskTypeHandler) considerWork(from string, ids []TaskID) (workAccepted bool) {
@ -112,8 +101,8 @@ top:
goto top goto top
} }
go func() {
h.Count.Add(1) h.Count.Add(1)
go func() {
log.Infow("Beginning work on Task", "id", *tID, "from", from, "name", h.Name) log.Infow("Beginning work on Task", "id", *tID, "from", from, "name", h.Name)
var done bool var done bool
@ -132,10 +121,12 @@ top:
h.recordCompletion(*tID, workStart, done, doErr) h.recordCompletion(*tID, workStart, done, doErr)
if done { if done {
h.triggerCompletionListeners(*tID) for _, fs := range h.TaskEngine.follows[h.Name] { // Do we know of any follows for this task type?
if _, err := fs.f(*tID, fs.h.AddTask); err != nil {
log.Error("Could not follow", "error", err, "from", h.Name, "to", fs.name)
}
}
} }
h.TaskEngine.tryAllWork <- true // Activate tasks in this machine
}() }()
done, doErr = h.Do(*tID, func() bool { done, doErr = h.Do(*tID, func() bool {
@ -244,65 +235,3 @@ func (h *taskTypeHandler) AssertMachineHasCapacity() error {
enoughGpuRam: enoughGpuRam:
return nil return nil
} }
var hClient = http.Client{}
func init() {
hClient.Timeout = 3 * time.Second
}
// triggerCompletionListeners does in order:
// 1. Trigger all in-process followers (b/c it's fast).
// 2. Trigger all living processes with followers via DB
// 3. Future followers (think partial upgrade) can read harmony_task_history
// 3a. The Listen() handles slow follows.
func (h *taskTypeHandler) triggerCompletionListeners(tID TaskID) {
// InProcess (#1 from Description)
inProcessDefs := h.TaskEngine.follows[h.Name]
inProcessFollowers := make([]string, len(inProcessDefs))
for _, fs := range inProcessDefs {
b, err := fs.f(tID, fs.h.AddTask)
if err != nil {
log.Error("Could not follow", "error", err, "from", h.Name, "to", fs.name)
}
if b {
inProcessFollowers = append(inProcessFollowers, fs.h.Name)
}
}
// Over HTTP (#2 from Description)
var hps []struct {
HostAndPort string
ToType string
}
err := h.TaskEngine.db.Select(h.TaskEngine.ctx, &hps, `SELECT m.host_and_port, to_type
FROM harmony_task_follow f JOIN harmony_machines m ON m.id=f.owner_id
WHERE from_type=$1 AND to_type NOT IN $2 AND f.owner_id != $3`,
h.Name, inProcessFollowers, h.TaskEngine.ownerID)
if err != nil {
log.Warn("Could not fast-trigger partner processes.", err)
return
}
hostsVisited := map[string]bool{}
tasksVisited := map[string]bool{}
for _, v := range hps {
if hostsVisited[v.HostAndPort] || tasksVisited[v.ToType] {
continue
}
resp, err := hClient.Get(v.HostAndPort + "/scheduler/follows/" + h.Name)
if err != nil {
log.Warn("Couldn't hit http endpoint: ", err)
continue
}
b, err := io.ReadAll(resp.Body)
if err != nil {
log.Warn("Couldn't hit http endpoint: ", err)
continue
}
hostsVisited[v.HostAndPort], tasksVisited[v.ToType] = true, true
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
log.Error("IO failed for fast nudge: ", string(b))
continue
}
}
}