harmonytask parallel test run gotcha

This commit is contained in:
Andrew Jackson (Ajax) 2023-08-21 17:47:43 -05:00
parent 415a0ac364
commit 6fd468dfc0

View File

@ -117,7 +117,6 @@ func (t *passthru) Adder(add harmonytask.AddTaskFunc) {
// Common stuff // Common stuff
var dtl = harmonytask.TaskTypeDetails{Name: "foo", Max: -1, Cost: resources.Resources{}} var dtl = harmonytask.TaskTypeDetails{Name: "foo", Max: -1, Cost: resources.Resources{}}
var letters []string
var lettersMutex sync.Mutex var lettersMutex sync.Mutex
func fooLetterAdder(t *testing.T, cdb *harmonydb.DB) *passthru { func fooLetterAdder(t *testing.T, cdb *harmonydb.DB) *passthru {
@ -136,7 +135,7 @@ func fooLetterAdder(t *testing.T, cdb *harmonydb.DB) *passthru {
}, },
} }
} }
func fooLetterSaver(t *testing.T, cdb *harmonydb.DB) *passthru { func fooLetterSaver(t *testing.T, cdb *harmonydb.DB, dest *[]string) *passthru {
return &passthru{ return &passthru{
dtl: dtl, dtl: dtl,
canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil }, canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil },
@ -147,7 +146,7 @@ func fooLetterSaver(t *testing.T, cdb *harmonydb.DB) *passthru {
require.NoError(t, err) require.NoError(t, err)
lettersMutex.Lock() lettersMutex.Lock()
defer lettersMutex.Unlock() defer lettersMutex.Unlock()
letters = append(letters, content) *dest = append(*dest, content)
return true, nil return true, nil
}, },
} }
@ -157,7 +156,8 @@ func TestHarmonyTasksWith2PartiesPolling(t *testing.T) {
withDbSetup(t, func(m *kit.TestMiner) { withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
senderParty := fooLetterAdder(t, cdb) senderParty := fooLetterAdder(t, cdb)
workerParty := fooLetterSaver(t, cdb) var dest []string
workerParty := fooLetterSaver(t, cdb, &dest)
harmonytask.POLL_DURATION = time.Millisecond * 100 harmonytask.POLL_DURATION = time.Millisecond * 100
sender, err := harmonytask.New(cdb, []harmonytask.TaskInterface{senderParty}, "test:1") sender, err := harmonytask.New(cdb, []harmonytask.TaskInterface{senderParty}, "test:1")
require.NoError(t, err) require.NoError(t, err)
@ -166,8 +166,8 @@ func TestHarmonyTasksWith2PartiesPolling(t *testing.T) {
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE.
sender.GracefullyTerminate(time.Second * 5) sender.GracefullyTerminate(time.Second * 5)
worker.GracefullyTerminate(time.Second * 5) worker.GracefullyTerminate(time.Second * 5)
sort.Strings(letters) sort.Strings(dest)
require.Equal(t, letters, []string{"A", "B"}) require.Equal(t, dest, []string{"A", "B"})
}) })
} }
@ -190,11 +190,12 @@ func TestWorkStealing(t *testing.T) {
harmonytask.POLL_DURATION = time.Millisecond * 100 harmonytask.POLL_DURATION = time.Millisecond * 100
harmonytask.CLEANUP_FREQUENCY = time.Millisecond * 100 harmonytask.CLEANUP_FREQUENCY = time.Millisecond * 100
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fooLetterSaver(t, cdb)}, "test:2") var dest []string
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fooLetterSaver(t, cdb, &dest)}, "test:2")
require.ErrorIs(t, err, nil) require.ErrorIs(t, err, nil)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE.
worker.GracefullyTerminate(time.Second * 5) worker.GracefullyTerminate(time.Second * 5)
require.Equal(t, []string{"M"}, letters) require.Equal(t, []string{"M"}, dest)
}) })
} }
@ -207,6 +208,7 @@ func TestTaskRetry(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
alreadyFailed := map[string]bool{} alreadyFailed := map[string]bool{}
var dest []string
fails2xPerMsg := &passthru{ fails2xPerMsg := &passthru{
dtl: dtl, dtl: dtl,
canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil }, canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil },
@ -221,7 +223,7 @@ func TestTaskRetry(t *testing.T) {
alreadyFailed[content] = true alreadyFailed[content] = true
return false, errors.New("intentional 'error'") return false, errors.New("intentional 'error'")
} }
letters = append(letters, content) dest = append(dest, content)
return true, nil return true, nil
}, },
} }
@ -230,8 +232,8 @@ func TestTaskRetry(t *testing.T) {
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
sender.GracefullyTerminate(time.Hour) sender.GracefullyTerminate(time.Hour)
rcv.GracefullyTerminate(time.Hour) rcv.GracefullyTerminate(time.Hour)
sort.Strings(letters) sort.Strings(dest)
require.Equal(t, []string{"A", "B"}, letters) require.Equal(t, []string{"A", "B"}, dest)
type hist struct { type hist struct {
TaskID int TaskID int
Result bool Result bool