diff --git a/cmd/lotus-provider/proving.go b/cmd/lotus-provider/proving.go index 337a8cb22..51f302bcc 100644 --- a/cmd/lotus-provider/proving.go +++ b/cmd/lotus-provider/proving.go @@ -80,8 +80,6 @@ var wdPostTaskCmd = &cli.Command{ } var taskId int64 - retryDelay := time.Millisecond * 10 - retryAddTask: _, err = deps.DB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&taskId) if err != nil { @@ -100,13 +98,8 @@ var wdPostTaskCmd = &cli.Command{ return false, xerrors.Errorf("inserting into harmony_tests: %w", err) } return true, nil - }) + }, harmonydb.RetrySerializationErr()) if err != nil { - if harmonydb.IsErrSerialization(err) { - time.Sleep(retryDelay) - retryDelay *= 2 - goto retryAddTask - } return xerrors.Errorf("writing SQL transaction: %w", err) } fmt.Printf("Inserted task %v. Waiting for success ", taskId) @@ -122,6 +115,7 @@ var wdPostTaskCmd = &cli.Command{ } fmt.Print(".") } + fmt.Println() log.Infof("Result: %s", result.String) return nil }, diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index 08f99e3e9..65102d0f0 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -135,11 +135,34 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done deadline := wdpost.NewDeadlineInfo(abi.ChainEpoch(pps), dlIdx, head.Height()) - if deadline.PeriodElapsed() { + var testTask *int + isTestTask := func() bool { + if testTask != nil { + return *testTask > 0 + } + + testTask = new(int) + err := t.db.QueryRow(context.Background(), `SELECT COUNT(*) FROM harmony_test WHERE task_id = $1`, taskID).Scan(testTask) + if err != nil { + log.Errorf("WdPostTask.Do() failed to queryRow: %v", err) + return false + } + + return *testTask > 0 + } + + if deadline.PeriodElapsed() && !isTestTask() { log.Errorf("WdPost removed stale task: %v %v", taskID, deadline) return true, nil } + if deadline.Challenge > head.Height() { + if isTestTask() { + deadline = wdpost.NewDeadlineInfo(abi.ChainEpoch(pps)-deadline.WPoStProvingPeriod, dlIdx, head.Height()-deadline.WPoStProvingPeriod) + log.Warnw("Test task is in the future, adjusting to past", "taskID", taskID, "deadline", deadline) + } + } + maddr, err := address.NewIDAddress(spID) if err != nil { log.Errorf("WdPostTask.Do() failed to NewIDAddress: %v", err) @@ -163,11 +186,7 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("marshaling PoSt: %w", err) } - testTaskIDCt := 0 - if err = t.db.QueryRow(context.Background(), `SELECT COUNT(*) FROM harmony_test WHERE task_id = $1`, taskID).Scan(&testTaskIDCt); err != nil { - return false, xerrors.Errorf("querying for test task: %w", err) - } - if testTaskIDCt == 1 { + if isTestTask() { // Do not send test tasks to the chain but to harmony_test & stdout. data, err := json.MarshalIndent(map[string]any{ @@ -243,7 +262,6 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng PartitionIndex uint64 dlInfo *dline.Info `pgx:"-"` - openTs *types.TipSet } var tasks []wdTaskDef @@ -265,13 +283,9 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng tasks[i].dlInfo = wdpost.NewDeadlineInfo(tasks[i].ProvingPeriodStart, tasks[i].DeadlineIndex, ts.Height()) if tasks[i].dlInfo.PeriodElapsed() { + // note: Those may be test tasks return &tasks[i].TaskID, nil } - - tasks[i].openTs, err = t.api.ChainGetTipSetAfterHeight(context.Background(), tasks[i].dlInfo.Open, ts.Key()) - if err != nil { - return nil, xerrors.Errorf("getting task open tipset: %w", err) - } } // todo fix the block below