harmonytask cleanups

This commit is contained in:
Andrew Jackson (Ajax) 2023-08-21 17:13:17 -05:00
parent 610a8c55e9
commit 84f4cdfc15
5 changed files with 20 additions and 10 deletions

View File

@ -783,6 +783,12 @@ workflows:
- build - build
suite: itest-harmonydb suite: itest-harmonydb
target: "./itests/harmonydb_test.go" target: "./itests/harmonydb_test.go"
- test:
name: test-itest-harmonytask
requires:
- build
suite: itest-harmonytask
target: "./itests/harmonytask_test.go"
- test: - test:
name: test-itest-lite_migration name: test-itest-lite_migration
requires: requires:

1
go.mod
View File

@ -135,7 +135,6 @@ require (
github.com/raulk/clock v1.1.0 github.com/raulk/clock v1.1.0
github.com/raulk/go-watchdog v1.3.0 github.com/raulk/go-watchdog v1.3.0
github.com/samber/lo v1.38.1 github.com/samber/lo v1.38.1
github.com/samuel/go-opencl v0.0.0-20171108220231-cbcfd10c32ad
github.com/stretchr/testify v1.8.4 github.com/stretchr/testify v1.8.4
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/urfave/cli/v2 v2.25.5 github.com/urfave/cli/v2 v2.25.5

View File

@ -50,7 +50,8 @@ func (t *task1) TypeDetails() harmonytask.TaskTypeDetails {
} }
} }
func (t *task1) Adder(add harmonytask.AddTaskFunc) { func (t *task1) Adder(add harmonytask.AddTaskFunc) {
for _, v := range t.toAdd { for _, vTmp := range t.toAdd {
v := vTmp
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool { add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool {
t.myPersonalTableLock.Lock() t.myPersonalTableLock.Lock()
defer t.myPersonalTableLock.Unlock() defer t.myPersonalTableLock.Unlock()
@ -115,7 +116,8 @@ func fooLetterAdder(t *testing.T, cdb *harmonydb.DB) *passthru {
dtl: dtl, dtl: dtl,
canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return nil, nil }, canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return nil, nil },
adder: func(add harmonytask.AddTaskFunc) { adder: func(add harmonytask.AddTaskFunc) {
for _, v := range []string{"A", "B"} { for _, vTmp := range []string{"A", "B"} {
v := vTmp
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool { add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool {
_, err := tx.Exec("INSERT INTO itest_scratch (some_int, content) VALUES ($1,$2)", tID, v) _, err := tx.Exec("INSERT INTO itest_scratch (some_int, content) VALUES ($1,$2)", tID, v)
require.NoError(t, err) require.NoError(t, err)

View File

@ -248,10 +248,10 @@ func (e *TaskEngine) followWorkInDB() {
var lastFollowTime time.Time var lastFollowTime time.Time
lastFollowTime, e.lastFollowTime = e.lastFollowTime, time.Now() lastFollowTime, e.lastFollowTime = e.lastFollowTime, time.Now()
for from_name, srcs := range e.follows { for fromName, srcs := range e.follows {
var cList []int // Which work is done (that we follow) since we last checked? var cList []int // Which work is done (that we follow) since we last checked?
err := e.db.Select(e.ctx, &cList, `SELECT h.task_id FROM harmony_task_history err := e.db.Select(e.ctx, &cList, `SELECT h.task_id FROM harmony_task_history
WHERE h.work_end>$1 AND h.name=$2`, lastFollowTime, from_name) WHERE h.work_end>$1 AND h.name=$2`, lastFollowTime, fromName)
if err != nil { if err != nil {
log.Error("Could not query DB: ", err) log.Error("Could not query DB: ", err)
return return
@ -269,9 +269,9 @@ func (e *TaskEngine) followWorkInDB() {
continue continue
} }
// we need to create this task // we need to create this task
if !src.h.Follows[from_name](TaskID(workAlreadyDone), src.h.AddTask) { if !src.h.Follows[fromName](TaskID(workAlreadyDone), src.h.AddTask) {
// But someone may have beaten us to it. // But someone may have beaten us to it.
log.Infof("Unable to add task %s following Task(%d, %s)", src.h.Name, workAlreadyDone, from_name) log.Infof("Unable to add task %s following Task(%d, %s)", src.h.Name, workAlreadyDone, fromName)
} }
} }
} }
@ -317,7 +317,7 @@ func (e *TaskEngine) GetHttpHandlers() http.Handler {
s := root.PathPrefix("/scheduler") s := root.PathPrefix("/scheduler")
f := s.PathPrefix("/follows") f := s.PathPrefix("/follows")
b := s.PathPrefix("/bump") b := s.PathPrefix("/bump")
for name, v := range e.follows { for name, vs := range e.follows {
f.Path("/" + name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { f.Path("/" + name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tIDString := mux.Vars(r)["tID"] tIDString := mux.Vars(r)["tID"]
tID, err := strconv.Atoi(tIDString) tID, err := strconv.Atoi(tIDString)
@ -327,7 +327,7 @@ func (e *TaskEngine) GetHttpHandlers() http.Handler {
return return
} }
taskAdded := false taskAdded := false
for _, v := range v { for _, v := range vs {
taskAdded = taskAdded || v.f(TaskID(tID), v.h.AddTask) taskAdded = taskAdded || v.f(TaskID(tID), v.h.AddTask)
} }
if taskAdded { if taskAdded {
@ -338,7 +338,8 @@ func (e *TaskEngine) GetHttpHandlers() http.Handler {
w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked. w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked.
}) })
} }
for _, h := range e.handlers { for _, hTmp := range e.handlers {
h := hTmp
b.Path("/" + h.Name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { b.Path("/" + h.Name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tIDString := mux.Vars(r)["tID"] tIDString := mux.Vars(r)["tID"]
tID, err := strconv.Atoi(tIDString) tID, err := strconv.Atoi(tIDString)

View File

@ -6,6 +6,8 @@
#define CL_USE_DEPRECATED_OPENCL_1_2_APIS #define CL_USE_DEPRECATED_OPENCL_1_2_APIS
#define CL_USE_DEPRECATED_OPENCL_2_0_APIS #define CL_USE_DEPRECATED_OPENCL_2_0_APIS
#define CL_TARGET_OPENCL_VERSION 300
#ifdef __APPLE__ #ifdef __APPLE__
#include "OpenCL/opencl.h" #include "OpenCL/opencl.h"
#else #else