essential fixes for scheduling

This commit is contained in:
Andrew Jackson (Ajax) 2023-11-14 22:38:04 -06:00
parent 8fe8977758
commit 7ec9eb0a70
6 changed files with 28 additions and 19 deletions

View File

@ -1030,7 +1030,7 @@ workflows:
requires: requires:
- build - build
suite: utest-unit-rest suite: utest-unit-rest
target: "./blockstore/... ./build/... ./chain/... ./conformance/... ./gateway/... ./journal/... ./lib/... ./markets/... ./paychmgr/... ./provider/... ./tools/..." target: "./blockstore/... ./build/... ./chain/... ./conformance/... ./gateway/... ./journal/... ./lib/... ./markets/... ./paychmgr/... ./tools/..."
- test: - test:
name: test-unit-storage name: test-unit-storage

View File

@ -72,7 +72,13 @@ func (t *task1) Adder(add harmonytask.AddTaskFunc) {
} }
} }
func init() {
//logging.SetLogLevel("harmonydb", "debug")
//logging.SetLogLevel("harmonytask", "debug")
}
func TestHarmonyTasks(t *testing.T) { func TestHarmonyTasks(t *testing.T) {
t.Parallel()
withDbSetup(t, func(m *kit.TestMiner) { withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
t1 := &task1{ t1 := &task1{
@ -82,7 +88,7 @@ func TestHarmonyTasks(t *testing.T) {
harmonytask.POLL_DURATION = time.Millisecond * 100 harmonytask.POLL_DURATION = time.Millisecond * 100
e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1") e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1")
require.NoError(t, err) require.NoError(t, err)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE.
e.GracefullyTerminate(time.Minute) e.GracefullyTerminate(time.Minute)
expected := []string{"taskResult56", "taskResult73"} expected := []string{"taskResult56", "taskResult73"}
sort.Strings(t1.WorkCompleted) sort.Strings(t1.WorkCompleted)
@ -154,6 +160,7 @@ func fooLetterSaver(t *testing.T, cdb *harmonydb.DB, dest *[]string) *passthru {
} }
func TestHarmonyTasksWith2PartiesPolling(t *testing.T) { func TestHarmonyTasksWith2PartiesPolling(t *testing.T) {
t.Parallel()
withDbSetup(t, func(m *kit.TestMiner) { withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
senderParty := fooLetterAdder(t, cdb) senderParty := fooLetterAdder(t, cdb)
@ -164,7 +171,7 @@ func TestHarmonyTasksWith2PartiesPolling(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{workerParty}, "test:2") worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{workerParty}, "test:2")
require.NoError(t, err) require.NoError(t, err)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE.
sender.GracefullyTerminate(time.Second * 5) sender.GracefullyTerminate(time.Second * 5)
worker.GracefullyTerminate(time.Second * 5) worker.GracefullyTerminate(time.Second * 5)
sort.Strings(dest) sort.Strings(dest)
@ -173,14 +180,15 @@ func TestHarmonyTasksWith2PartiesPolling(t *testing.T) {
} }
func TestWorkStealing(t *testing.T) { func TestWorkStealing(t *testing.T) {
t.Parallel()
withDbSetup(t, func(m *kit.TestMiner) { withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
ctx := context.Background() ctx := context.Background()
// The dead worker will be played by a few SQL INSERTS. // The dead worker will be played by a few SQL INSERTS.
_, err := cdb.Exec(ctx, `INSERT INTO harmony_machines _, err := cdb.Exec(ctx, `INSERT INTO harmony_machines
(id, last_contact,host_and_port, cpu, ram, gpu, gpuram) (id, last_contact,host_and_port, cpu, ram, gpu)
VALUES (300, DATE '2000-01-01', 'test:1', 4, 400000, 1, 1000000)`) VALUES (300, DATE '2000-01-01', 'test:1', 4, 400000, 1)`)
require.ErrorIs(t, err, nil) require.ErrorIs(t, err, nil)
_, err = cdb.Exec(ctx, `INSERT INTO harmony_task _, err = cdb.Exec(ctx, `INSERT INTO harmony_task
(id, name, owner_id, posted_time, added_by) (id, name, owner_id, posted_time, added_by)
@ -194,13 +202,14 @@ func TestWorkStealing(t *testing.T) {
var dest []string var dest []string
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fooLetterSaver(t, cdb, &dest)}, "test:2") worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fooLetterSaver(t, cdb, &dest)}, "test:2")
require.ErrorIs(t, err, nil) require.ErrorIs(t, err, nil)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE.
worker.GracefullyTerminate(time.Second * 5) worker.GracefullyTerminate(time.Second * 5)
require.Equal(t, []string{"M"}, dest) require.Equal(t, []string{"M"}, dest)
}) })
} }
func TestTaskRetry(t *testing.T) { func TestTaskRetry(t *testing.T) {
t.Parallel()
withDbSetup(t, func(m *kit.TestMiner) { withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
senderParty := fooLetterAdder(t, cdb) senderParty := fooLetterAdder(t, cdb)
@ -232,7 +241,7 @@ func TestTaskRetry(t *testing.T) {
} }
rcv, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fails2xPerMsg}, "test:2") rcv, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fails2xPerMsg}, "test:2")
require.NoError(t, err) require.NoError(t, err)
time.Sleep(3 * time.Second) time.Sleep(time.Second)
sender.GracefullyTerminate(time.Hour) sender.GracefullyTerminate(time.Hour)
rcv.GracefullyTerminate(time.Hour) rcv.GracefullyTerminate(time.Hour)
sort.Strings(dest) sort.Strings(dest)

View File

@ -3,7 +3,6 @@ package harmonydb
import ( import (
"context" "context"
"embed" "embed"
"errors"
"fmt" "fmt"
"math/rand" "math/rand"
"net" "net"
@ -17,6 +16,7 @@ import (
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
) )
@ -205,16 +205,16 @@ func ensureSchemaExists(connString, schema string) error {
p, err := pgx.Connect(ctx, connString) p, err := pgx.Connect(ctx, connString)
defer cncl() defer cncl()
if err != nil { if err != nil {
return fmt.Errorf("unable to connect to db: %s, err: %v", connString, err) return xerrors.Errorf("unable to connect to db: %s, err: %v", connString, err)
} }
defer func() { _ = p.Close(context.Background()) }() defer func() { _ = p.Close(context.Background()) }()
if len(schema) < 5 || !schemaRE.MatchString(schema) { if len(schema) < 5 || !schemaRE.MatchString(schema) {
return errors.New("schema must be of the form " + schemaREString + "\n Got: " + schema) return xerrors.New("schema must be of the form " + schemaREString + "\n Got: " + schema)
} }
_, err = p.Exec(context.Background(), "CREATE SCHEMA IF NOT EXISTS "+schema) _, err = p.Exec(context.Background(), "CREATE SCHEMA IF NOT EXISTS "+schema)
if err != nil { if err != nil {
return fmt.Errorf("cannot create schema: %w", err) return xerrors.Errorf("cannot create schema: %w", err)
} }
return nil return nil
} }
@ -232,7 +232,7 @@ func (db *DB) upgrade() error {
)`) )`)
if err != nil { if err != nil {
logger.Error("Upgrade failed.") logger.Error("Upgrade failed.")
return err return xerrors.Errorf("Cannot create base table %w", err)
} }
// __Run scripts in order.__ // __Run scripts in order.__
@ -243,7 +243,7 @@ func (db *DB) upgrade() error {
err = db.Select(context.Background(), &landedEntries, "SELECT entry FROM base") err = db.Select(context.Background(), &landedEntries, "SELECT entry FROM base")
if err != nil { if err != nil {
logger.Error("Cannot read entries: " + err.Error()) logger.Error("Cannot read entries: " + err.Error())
return err return xerrors.Errorf("cannot read entries: %w", err)
} }
for _, l := range landedEntries { for _, l := range landedEntries {
landed[l.Entry] = true landed[l.Entry] = true
@ -278,7 +278,7 @@ func (db *DB) upgrade() error {
if err != nil { if err != nil {
msg := fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error()) msg := fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error())
logger.Error(msg) logger.Error(msg)
return errors.New(msg) // makes devs lives easier by placing message at the end. return xerrors.New(msg) // makes devs lives easier by placing message at the end.
} }
} }
@ -286,7 +286,7 @@ func (db *DB) upgrade() error {
_, err = db.Exec(context.Background(), "INSERT INTO base (entry) VALUES ($1)", name) _, err = db.Exec(context.Background(), "INSERT INTO base (entry) VALUES ($1)", name)
if err != nil { if err != nil {
logger.Error("Cannot update base: " + err.Error()) logger.Error("Cannot update base: " + err.Error())
return fmt.Errorf("cannot insert into base: %w", err) return xerrors.Errorf("cannot insert into base: %w", err)
} }
} }
return nil return nil

View File

@ -198,7 +198,7 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done
} }
} }
_, err = tx.Exec(`INSERT INTO harmony_task_history _, err = tx.Exec(`INSERT INTO harmony_task_history
(task_id, name, posted, work_start, work_end, result, by_host_and_port, err) (task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime, workStart, workEnd, done, h.TaskEngine.hostAndPort, result) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime, workStart, workEnd, done, h.TaskEngine.hostAndPort, result)
if err != nil { if err != nil {
return false, fmt.Errorf("could not write history: %w", err) return false, fmt.Errorf("could not write history: %w", err)

View File

@ -94,7 +94,7 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
func CleanupMachines(ctx context.Context, db *harmonydb.DB) int { func CleanupMachines(ctx context.Context, db *harmonydb.DB) int {
ct, err := db.Exec(ctx, `DELETE FROM harmony_machines WHERE last_contact < $1`, ct, err := db.Exec(ctx, `DELETE FROM harmony_machines WHERE last_contact < $1`,
time.Now().Add(-1*LOOKS_DEAD_TIMEOUT).UTC()) time.Now().Add(-1*LOOKS_DEAD_TIMEOUT))
if err != nil { if err != nil {
logger.Warn("unable to delete old machines: ", err) logger.Warn("unable to delete old machines: ", err)
} }

View File

@ -16,7 +16,7 @@ import (
func basicTest(t *testing.T, repo Repo) { func basicTest(t *testing.T, repo Repo) {
apima, err := repo.APIEndpoint() apima, err := repo.APIEndpoint()
if assert.Error(t, err) { if assert.Error(t, err) {
assert.Equal(t, ErrNoAPIEndpoint, err) assert.ErrorContains(t, err, ErrNoAPIEndpoint.Error())
} }
assert.Nil(t, apima, "with no api endpoint, return should be nil") assert.Nil(t, apima, "with no api endpoint, return should be nil")
@ -72,7 +72,7 @@ func basicTest(t *testing.T, repo Repo) {
apima, err = repo.APIEndpoint() apima, err = repo.APIEndpoint()
if assert.Error(t, err) { if assert.Error(t, err) {
assert.Equal(t, ErrNoAPIEndpoint, err, "after closing repo, api should be nil") assert.ErrorContains(t, err, ErrNoAPIEndpoint.Error(), "after closing repo, api should be nil")
} }
assert.Nil(t, apima, "with closed repo, apima should be set back to nil") assert.Nil(t, apima, "with closed repo, apima should be set back to nil")