From 7ec9eb0a701375fb98df8a74254c7b5b1df9f90d Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Tue, 14 Nov 2023 22:38:04 -0600 Subject: [PATCH] essential fixes for scheduling --- .circleci/config.yml | 2 +- itests/harmonytask_test.go | 21 ++++++++++++++------ lib/harmony/harmonydb/harmonydb.go | 16 +++++++-------- lib/harmony/harmonytask/task_type_handler.go | 2 +- lib/harmony/resources/resources.go | 2 +- node/repo/repo_test.go | 4 ++-- 6 files changed, 28 insertions(+), 19 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 85bcd045c..a19703272 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1030,7 +1030,7 @@ workflows: requires: - build suite: utest-unit-rest - target: "./blockstore/... ./build/... ./chain/... ./conformance/... ./gateway/... ./journal/... ./lib/... ./markets/... ./paychmgr/... ./provider/... ./tools/..." + target: "./blockstore/... ./build/... ./chain/... ./conformance/... ./gateway/... ./journal/... ./lib/... ./markets/... ./paychmgr/... ./tools/..." - test: name: test-unit-storage diff --git a/itests/harmonytask_test.go b/itests/harmonytask_test.go index 7867ca194..f3b02f9b2 100644 --- a/itests/harmonytask_test.go +++ b/itests/harmonytask_test.go @@ -72,7 +72,13 @@ func (t *task1) Adder(add harmonytask.AddTaskFunc) { } } +func init() { + //logging.SetLogLevel("harmonydb", "debug") + //logging.SetLogLevel("harmonytask", "debug") +} + func TestHarmonyTasks(t *testing.T) { + t.Parallel() withDbSetup(t, func(m *kit.TestMiner) { cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB t1 := &task1{ @@ -82,7 +88,7 @@ func TestHarmonyTasks(t *testing.T) { harmonytask.POLL_DURATION = time.Millisecond * 100 e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1") require.NoError(t, err) - time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. + time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE. e.GracefullyTerminate(time.Minute) expected := []string{"taskResult56", "taskResult73"} sort.Strings(t1.WorkCompleted) @@ -154,6 +160,7 @@ func fooLetterSaver(t *testing.T, cdb *harmonydb.DB, dest *[]string) *passthru { } func TestHarmonyTasksWith2PartiesPolling(t *testing.T) { + t.Parallel() withDbSetup(t, func(m *kit.TestMiner) { cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB senderParty := fooLetterAdder(t, cdb) @@ -164,7 +171,7 @@ func TestHarmonyTasksWith2PartiesPolling(t *testing.T) { require.NoError(t, err) worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{workerParty}, "test:2") require.NoError(t, err) - time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. + time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE. sender.GracefullyTerminate(time.Second * 5) worker.GracefullyTerminate(time.Second * 5) sort.Strings(dest) @@ -173,14 +180,15 @@ func TestHarmonyTasksWith2PartiesPolling(t *testing.T) { } func TestWorkStealing(t *testing.T) { + t.Parallel() withDbSetup(t, func(m *kit.TestMiner) { cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB ctx := context.Background() // The dead worker will be played by a few SQL INSERTS. _, err := cdb.Exec(ctx, `INSERT INTO harmony_machines - (id, last_contact,host_and_port, cpu, ram, gpu, gpuram) - VALUES (300, DATE '2000-01-01', 'test:1', 4, 400000, 1, 1000000)`) + (id, last_contact,host_and_port, cpu, ram, gpu) + VALUES (300, DATE '2000-01-01', 'test:1', 4, 400000, 1)`) require.ErrorIs(t, err, nil) _, err = cdb.Exec(ctx, `INSERT INTO harmony_task (id, name, owner_id, posted_time, added_by) @@ -194,13 +202,14 @@ func TestWorkStealing(t *testing.T) { var dest []string worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fooLetterSaver(t, cdb, &dest)}, "test:2") require.ErrorIs(t, err, nil) - time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. + time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE. worker.GracefullyTerminate(time.Second * 5) require.Equal(t, []string{"M"}, dest) }) } func TestTaskRetry(t *testing.T) { + t.Parallel() withDbSetup(t, func(m *kit.TestMiner) { cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB senderParty := fooLetterAdder(t, cdb) @@ -232,7 +241,7 @@ func TestTaskRetry(t *testing.T) { } rcv, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fails2xPerMsg}, "test:2") require.NoError(t, err) - time.Sleep(3 * time.Second) + time.Sleep(time.Second) sender.GracefullyTerminate(time.Hour) rcv.GracefullyTerminate(time.Hour) sort.Strings(dest) diff --git a/lib/harmony/harmonydb/harmonydb.go b/lib/harmony/harmonydb/harmonydb.go index 6d9970732..8636dffc5 100644 --- a/lib/harmony/harmonydb/harmonydb.go +++ b/lib/harmony/harmonydb/harmonydb.go @@ -3,7 +3,6 @@ package harmonydb import ( "context" "embed" - "errors" "fmt" "math/rand" "net" @@ -17,6 +16,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/node/config" ) @@ -205,16 +205,16 @@ func ensureSchemaExists(connString, schema string) error { p, err := pgx.Connect(ctx, connString) defer cncl() if err != nil { - return fmt.Errorf("unable to connect to db: %s, err: %v", connString, err) + return xerrors.Errorf("unable to connect to db: %s, err: %v", connString, err) } defer func() { _ = p.Close(context.Background()) }() if len(schema) < 5 || !schemaRE.MatchString(schema) { - return errors.New("schema must be of the form " + schemaREString + "\n Got: " + schema) + return xerrors.New("schema must be of the form " + schemaREString + "\n Got: " + schema) } _, err = p.Exec(context.Background(), "CREATE SCHEMA IF NOT EXISTS "+schema) if err != nil { - return fmt.Errorf("cannot create schema: %w", err) + return xerrors.Errorf("cannot create schema: %w", err) } return nil } @@ -232,7 +232,7 @@ func (db *DB) upgrade() error { )`) if err != nil { logger.Error("Upgrade failed.") - return err + return xerrors.Errorf("Cannot create base table %w", err) } // __Run scripts in order.__ @@ -243,7 +243,7 @@ func (db *DB) upgrade() error { err = db.Select(context.Background(), &landedEntries, "SELECT entry FROM base") if err != nil { logger.Error("Cannot read entries: " + err.Error()) - return err + return xerrors.Errorf("cannot read entries: %w", err) } for _, l := range landedEntries { landed[l.Entry] = true @@ -278,7 +278,7 @@ func (db *DB) upgrade() error { if err != nil { msg := fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error()) logger.Error(msg) - return errors.New(msg) // makes devs lives easier by placing message at the end. + return xerrors.New(msg) // makes devs lives easier by placing message at the end. } } @@ -286,7 +286,7 @@ func (db *DB) upgrade() error { _, err = db.Exec(context.Background(), "INSERT INTO base (entry) VALUES ($1)", name) if err != nil { logger.Error("Cannot update base: " + err.Error()) - return fmt.Errorf("cannot insert into base: %w", err) + return xerrors.Errorf("cannot insert into base: %w", err) } } return nil diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 7ec47d32a..79a156fef 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -198,7 +198,7 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done } } _, err = tx.Exec(`INSERT INTO harmony_task_history - (task_id, name, posted, work_start, work_end, result, by_host_and_port, err) + (task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime, workStart, workEnd, done, h.TaskEngine.hostAndPort, result) if err != nil { return false, fmt.Errorf("could not write history: %w", err) diff --git a/lib/harmony/resources/resources.go b/lib/harmony/resources/resources.go index 8288aaf02..91f6eed37 100644 --- a/lib/harmony/resources/resources.go +++ b/lib/harmony/resources/resources.go @@ -94,7 +94,7 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) { func CleanupMachines(ctx context.Context, db *harmonydb.DB) int { ct, err := db.Exec(ctx, `DELETE FROM harmony_machines WHERE last_contact < $1`, - time.Now().Add(-1*LOOKS_DEAD_TIMEOUT).UTC()) + time.Now().Add(-1*LOOKS_DEAD_TIMEOUT)) if err != nil { logger.Warn("unable to delete old machines: ", err) } diff --git a/node/repo/repo_test.go b/node/repo/repo_test.go index 16c101d44..c78afa9db 100644 --- a/node/repo/repo_test.go +++ b/node/repo/repo_test.go @@ -16,7 +16,7 @@ import ( func basicTest(t *testing.T, repo Repo) { apima, err := repo.APIEndpoint() if assert.Error(t, err) { - assert.Equal(t, ErrNoAPIEndpoint, err) + assert.ErrorContains(t, err, ErrNoAPIEndpoint.Error()) } assert.Nil(t, apima, "with no api endpoint, return should be nil") @@ -72,7 +72,7 @@ func basicTest(t *testing.T, repo Repo) { apima, err = repo.APIEndpoint() if assert.Error(t, err) { - assert.Equal(t, ErrNoAPIEndpoint, err, "after closing repo, api should be nil") + assert.ErrorContains(t, err, ErrNoAPIEndpoint.Error(), "after closing repo, api should be nil") } assert.Nil(t, apima, "with closed repo, apima should be set back to nil")