lotus/itests/harmonytask_test.go

267 lines
8.3 KiB
Go
Raw Normal View History

2023-08-14 16:40:12 +00:00
package itests
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"testing"
"time"
2023-08-29 00:44:56 +00:00
logging "github.com/ipfs/go-log/v2"
2023-08-21 16:26:26 +00:00
"github.com/stretchr/testify/require"
2023-08-14 16:40:12 +00:00
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/node/impl"
)
type task1 struct {
toAdd []int
myPersonalTableLock sync.Mutex
2023-08-25 21:11:31 +00:00
myPersonalTable map[harmonytask.TaskID]int // This would typically be a DB table
2023-08-14 16:40:12 +00:00
WorkCompleted []string
}
2023-08-21 22:33:25 +00:00
func withDbSetup(t *testing.T, f func(*kit.TestMiner)) {
_, miner, _ := kit.EnsembleMinimal(t,
kit.LatestActorsAt(-1),
kit.MockProofs(),
kit.WithSectorIndexDB(),
2023-08-21 22:33:25 +00:00
)
2023-08-29 00:44:56 +00:00
logging.SetLogLevel("harmonytask", "debug")
2023-08-21 22:33:25 +00:00
f(miner)
}
2023-08-14 16:40:12 +00:00
func (t *task1) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
if !stillOwned() {
return false, errors.New("Why not still owned?")
}
t.myPersonalTableLock.Lock()
defer t.myPersonalTableLock.Unlock()
t.WorkCompleted = append(t.WorkCompleted, fmt.Sprintf("taskResult%d", t.myPersonalTable[tID]))
return true, nil
}
func (t *task1) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
2023-08-14 16:40:12 +00:00
return &list[0], nil
}
func (t *task1) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: 100,
Name: "ThingOne",
MaxFailures: 1,
Cost: resources.Resources{
Cpu: 1,
Ram: 100 << 10, // at 100kb, it's tiny
},
}
}
func (t *task1) Adder(add harmonytask.AddTaskFunc) {
2023-08-21 22:13:17 +00:00
for _, vTmp := range t.toAdd {
v := vTmp
2023-08-26 03:07:07 +00:00
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
2023-08-14 16:40:12 +00:00
t.myPersonalTableLock.Lock()
defer t.myPersonalTableLock.Unlock()
t.myPersonalTable[tID] = v
2023-08-26 03:07:07 +00:00
return true, nil
2023-08-14 16:40:12 +00:00
})
}
}
2023-11-15 04:38:04 +00:00
func init() {
//logging.SetLogLevel("harmonydb", "debug")
//logging.SetLogLevel("harmonytask", "debug")
}
2023-08-14 16:40:12 +00:00
func TestHarmonyTasks(t *testing.T) {
2023-11-15 04:58:43 +00:00
//t.Parallel()
2023-08-21 22:33:25 +00:00
withDbSetup(t, func(m *kit.TestMiner) {
2023-08-14 16:40:12 +00:00
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
t1 := &task1{
toAdd: []int{56, 73},
myPersonalTable: map[harmonytask.TaskID]int{},
}
2023-10-27 03:10:18 +00:00
harmonytask.POLL_DURATION = time.Millisecond * 100
2023-08-14 16:40:12 +00:00
e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1")
require.NoError(t, err)
2023-11-15 04:38:04 +00:00
time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE.
2023-08-14 16:40:12 +00:00
e.GracefullyTerminate(time.Minute)
2023-08-21 23:02:04 +00:00
expected := []string{"taskResult56", "taskResult73"}
2023-08-14 16:40:12 +00:00
sort.Strings(t1.WorkCompleted)
2023-10-27 03:10:18 +00:00
require.Equal(t, expected, t1.WorkCompleted, "unexpected results")
2023-08-14 16:40:12 +00:00
})
}
type passthru struct {
dtl harmonytask.TaskTypeDetails
do func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error)
canAccept func(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error)
2023-08-14 16:40:12 +00:00
adder func(add harmonytask.AddTaskFunc)
}
func (t *passthru) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
return t.do(tID, stillOwned)
}
func (t *passthru) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return t.canAccept(list, e)
2023-08-14 16:40:12 +00:00
}
func (t *passthru) TypeDetails() harmonytask.TaskTypeDetails {
return t.dtl
}
func (t *passthru) Adder(add harmonytask.AddTaskFunc) {
if t.adder != nil {
t.adder(add)
}
}
// Common stuff
var dtl = harmonytask.TaskTypeDetails{Name: "foo", Max: -1, Cost: resources.Resources{}}
var lettersMutex sync.Mutex
func fooLetterAdder(t *testing.T, cdb *harmonydb.DB) *passthru {
return &passthru{
dtl: dtl,
canAccept: func(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return nil, nil
},
2023-08-14 16:40:12 +00:00
adder: func(add harmonytask.AddTaskFunc) {
2023-08-21 22:13:17 +00:00
for _, vTmp := range []string{"A", "B"} {
v := vTmp
2023-08-26 03:07:07 +00:00
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
2023-08-14 16:40:12 +00:00
_, err := tx.Exec("INSERT INTO itest_scratch (some_int, content) VALUES ($1,$2)", tID, v)
require.NoError(t, err)
2023-08-26 03:07:07 +00:00
return true, nil
2023-08-14 16:40:12 +00:00
})
}
},
}
}
2023-08-21 22:47:43 +00:00
func fooLetterSaver(t *testing.T, cdb *harmonydb.DB, dest *[]string) *passthru {
2023-08-14 16:40:12 +00:00
return &passthru{
dtl: dtl,
canAccept: func(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return &list[0], nil
},
2023-08-14 16:40:12 +00:00
do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
var content string
err = cdb.QueryRow(context.Background(),
"SELECT content FROM itest_scratch WHERE some_int=$1", tID).Scan(&content)
require.NoError(t, err)
lettersMutex.Lock()
defer lettersMutex.Unlock()
2023-08-21 22:47:43 +00:00
*dest = append(*dest, content)
2023-08-14 16:40:12 +00:00
return true, nil
},
}
}
func TestHarmonyTasksWith2PartiesPolling(t *testing.T) {
2023-11-15 04:58:43 +00:00
//t.Parallel()
2023-08-21 22:33:25 +00:00
withDbSetup(t, func(m *kit.TestMiner) {
2023-08-14 16:40:12 +00:00
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
senderParty := fooLetterAdder(t, cdb)
2023-08-21 22:47:43 +00:00
var dest []string
workerParty := fooLetterSaver(t, cdb, &dest)
2023-08-14 16:40:12 +00:00
harmonytask.POLL_DURATION = time.Millisecond * 100
sender, err := harmonytask.New(cdb, []harmonytask.TaskInterface{senderParty}, "test:1")
require.NoError(t, err)
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{workerParty}, "test:2")
require.NoError(t, err)
2023-11-15 04:38:04 +00:00
time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE.
2023-08-14 16:40:12 +00:00
sender.GracefullyTerminate(time.Second * 5)
worker.GracefullyTerminate(time.Second * 5)
2023-08-21 22:47:43 +00:00
sort.Strings(dest)
2023-08-29 00:21:59 +00:00
require.Equal(t, []string{"A", "B"}, dest)
2023-08-14 16:40:12 +00:00
})
}
func TestWorkStealing(t *testing.T) {
2023-11-15 04:58:43 +00:00
//t.Parallel()
2023-08-21 22:33:25 +00:00
withDbSetup(t, func(m *kit.TestMiner) {
2023-08-14 16:40:12 +00:00
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
ctx := context.Background()
// The dead worker will be played by a few SQL INSERTS.
_, err := cdb.Exec(ctx, `INSERT INTO harmony_machines
2023-11-15 04:38:04 +00:00
(id, last_contact,host_and_port, cpu, ram, gpu)
VALUES (300, DATE '2000-01-01', 'test:1', 4, 400000, 1)`)
2023-08-14 16:40:12 +00:00
require.ErrorIs(t, err, nil)
_, err = cdb.Exec(ctx, `INSERT INTO harmony_task
(id, name, owner_id, posted_time, added_by)
VALUES (1234, 'foo', 300, DATE '2000-01-01', 300)`)
require.ErrorIs(t, err, nil)
_, err = cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int, content) VALUES (1234, 'M')")
require.ErrorIs(t, err, nil)
harmonytask.POLL_DURATION = time.Millisecond * 100
harmonytask.CLEANUP_FREQUENCY = time.Millisecond * 100
2023-08-21 22:47:43 +00:00
var dest []string
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fooLetterSaver(t, cdb, &dest)}, "test:2")
2023-08-14 16:40:12 +00:00
require.ErrorIs(t, err, nil)
2023-11-15 04:38:04 +00:00
time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE.
2023-08-14 16:40:12 +00:00
worker.GracefullyTerminate(time.Second * 5)
2023-08-21 22:47:43 +00:00
require.Equal(t, []string{"M"}, dest)
2023-08-14 16:40:12 +00:00
})
}
func TestTaskRetry(t *testing.T) {
2023-11-15 04:58:43 +00:00
//t.Parallel()
2023-08-21 22:33:25 +00:00
withDbSetup(t, func(m *kit.TestMiner) {
2023-08-14 16:40:12 +00:00
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
senderParty := fooLetterAdder(t, cdb)
harmonytask.POLL_DURATION = time.Millisecond * 100
sender, err := harmonytask.New(cdb, []harmonytask.TaskInterface{senderParty}, "test:1")
require.NoError(t, err)
alreadyFailed := map[string]bool{}
2023-08-21 22:47:43 +00:00
var dest []string
2023-08-14 16:40:12 +00:00
fails2xPerMsg := &passthru{
dtl: dtl,
canAccept: func(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return &list[0], nil
},
2023-08-14 16:40:12 +00:00
do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
var content string
err = cdb.QueryRow(context.Background(),
"SELECT content FROM itest_scratch WHERE some_int=$1", tID).Scan(&content)
require.NoError(t, err)
lettersMutex.Lock()
defer lettersMutex.Unlock()
if !alreadyFailed[content] {
alreadyFailed[content] = true
return false, errors.New("intentional 'error'")
}
2023-08-21 22:47:43 +00:00
dest = append(dest, content)
2023-08-14 16:40:12 +00:00
return true, nil
},
}
rcv, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fails2xPerMsg}, "test:2")
require.NoError(t, err)
2023-11-15 04:38:04 +00:00
time.Sleep(time.Second)
2023-08-14 16:40:12 +00:00
sender.GracefullyTerminate(time.Hour)
rcv.GracefullyTerminate(time.Hour)
2023-08-21 22:47:43 +00:00
sort.Strings(dest)
require.Equal(t, []string{"A", "B"}, dest)
2023-08-14 16:40:12 +00:00
type hist struct {
TaskID int
Result bool
Err string
}
var res []hist
require.NoError(t, cdb.Select(context.Background(), &res,
`SELECT task_id, result, err FROM harmony_task_history
ORDER BY result DESC, task_id`))
require.Equal(t, []hist{
{1, true, ""},
{2, true, ""},
{1, false, "error: intentional 'error'"},
{2, false, "error: intentional 'error'"}}, res)
})
}