v1.27.0-a #10

Closed
jonathanface wants to merge 473 commits from v1.27.0-a into master
3 changed files with 92 additions and 14 deletions
Showing only changes of commit da92001c71 - Show all commits

View File

@ -264,3 +264,40 @@ func TestTaskRetry(t *testing.T) {
{2, false, "error: intentional 'error'"}}, res) {2, false, "error: intentional 'error'"}}, res)
}) })
} }
func TestBoredom(t *testing.T) {
//t.Parallel()
withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
harmonytask.POLL_DURATION = time.Millisecond * 100
var taskID harmonytask.TaskID
var ran bool
boredParty := &passthru{
dtl: harmonytask.TaskTypeDetails{
Name: "boredTest",
Max: -1,
Cost: resources.Resources{},
IAmBored: func(add harmonytask.AddTaskFunc) error {
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
taskID = tID
return true, nil
})
return nil
},
},
canAccept: func(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
require.Equal(t, harmonytask.WorkSourceIAmBored, e.WorkOrigin)
return &list[0], nil
},
do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
require.Equal(t, taskID, tID)
ran = true
return true, nil
},
}
ht, err := harmonytask.New(cdb, []harmonytask.TaskInterface{boredParty}, "test:1")
require.NoError(t, err)
require.Eventually(t, func() bool { return ran }, time.Second, time.Millisecond*100)
ht.GracefullyTerminate(time.Hour)
})
}

View File

@ -39,6 +39,12 @@ type TaskTypeDetails struct {
// NOTE: if refatoring tasks, see if your task is // NOTE: if refatoring tasks, see if your task is
// necessary. Ex: Is the sector state correct for your stage to run? // necessary. Ex: Is the sector state correct for your stage to run?
Follows map[string]func(TaskID, AddTaskFunc) (bool, error) Follows map[string]func(TaskID, AddTaskFunc) (bool, error)
// IAmBored is called (when populated) when there's capacity but no work.
// Tasks added will be proposed to CanAccept() on this machine.
// CanAccept() can read taskEngine's WorkOrigin string to learn about a task.
// Ex: make new CC sectors, clean-up, or retrying pipelines that failed in later states.
IAmBored func(AddTaskFunc) error
} }
// TaskInterface must be implemented in order to have a task used by harmonytask. // TaskInterface must be implemented in order to have a task used by harmonytask.
@ -97,6 +103,7 @@ type TaskInterface interface {
type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) (shouldCommit bool, seriousError error)) type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) (shouldCommit bool, seriousError error))
type TaskEngine struct { type TaskEngine struct {
// Static After New()
ctx context.Context ctx context.Context
handlers []*taskTypeHandler handlers []*taskTypeHandler
db *harmonydb.DB db *harmonydb.DB
@ -105,9 +112,12 @@ type TaskEngine struct {
taskMap map[string]*taskTypeHandler taskMap map[string]*taskTypeHandler
ownerID int ownerID int
follows map[string][]followStruct follows map[string][]followStruct
hostAndPort string
// synchronous to the single-threaded poller
lastFollowTime time.Time lastFollowTime time.Time
lastCleanup atomic.Value lastCleanup atomic.Value
hostAndPort string WorkOrigin string
} }
type followStruct struct { type followStruct struct {
f func(TaskID, AddTaskFunc) (bool, error) f func(TaskID, AddTaskFunc) (bool, error)
@ -177,7 +187,7 @@ func New(
continue // not really fatal, but not great continue // not really fatal, but not great
} }
} }
if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) { if !h.considerWork(WorkSourceRecover, []TaskID{TaskID(w.ID)}) {
log.Errorw("Strange: Unable to accept previously owned task", "id", w.ID, "type", w.Name) log.Errorw("Strange: Unable to accept previously owned task", "id", w.ID, "type", w.Name)
} }
} }
@ -327,13 +337,38 @@ func (e *TaskEngine) pollerTryAllWork() bool {
continue continue
} }
if len(unownedTasks) > 0 { if len(unownedTasks) > 0 {
accepted := v.considerWork(workSourcePoller, unownedTasks) accepted := v.considerWork(WorkSourcePoller, unownedTasks)
if accepted { if accepted {
return true // accept new work slowly and in priority order return true // accept new work slowly and in priority order
} }
log.Warn("Work not accepted for " + strconv.Itoa(len(unownedTasks)) + " " + v.Name + " task(s)") log.Warn("Work not accepted for " + strconv.Itoa(len(unownedTasks)) + " " + v.Name + " task(s)")
} }
} }
// if no work was accepted, are we bored? Then find work in priority order.
for _, v := range e.handlers {
if v.AssertMachineHasCapacity() != nil {
continue
}
if v.TaskTypeDetails.IAmBored != nil {
var added []TaskID
err := v.TaskTypeDetails.IAmBored(func(extraInfo func(TaskID, *harmonydb.Tx) (shouldCommit bool, seriousError error)) {
v.AddTask(func(tID TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
b, err := extraInfo(tID, tx)
if err == nil {
added = append(added, tID)
}
return b, err
})
})
if err != nil {
log.Error("IAmBored failed: ", err)
continue
}
if added != nil { // tiny chance a fail could make these bogus, but considerWork should then fail.
v.considerWork(WorkSourceIAmBored, added)
}
}
}
return false return false
} }

View File

@ -53,8 +53,9 @@ retryAddTask:
} }
const ( const (
workSourcePoller = "poller" WorkSourcePoller = "poller"
workSourceRecover = "recovered" WorkSourceRecover = "recovered"
WorkSourceIAmBored = "bored"
) )
// considerWork is called to attempt to start work on a task-id of this task type. // considerWork is called to attempt to start work on a task-id of this task type.
@ -84,9 +85,14 @@ top:
return false return false
} }
h.TaskEngine.WorkOrigin = from
// 3. What does the impl say? // 3. What does the impl say?
canAcceptAgain: canAcceptAgain:
tID, err := h.CanAccept(ids, h.TaskEngine) tID, err := h.CanAccept(ids, h.TaskEngine)
h.TaskEngine.WorkOrigin = ""
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return false return false
@ -123,7 +129,7 @@ canAcceptAgain:
} }
// if recovering we don't need to try to claim anything because those tasks are already claimed by us // if recovering we don't need to try to claim anything because those tasks are already claimed by us
if from != workSourceRecover { if from != WorkSourceRecover {
// 4. Can we claim the work for our hostname? // 4. Can we claim the work for our hostname?
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID) ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
if err != nil { if err != nil {