diff --git a/cmd/lotus-provider/tasks/tasks.go b/cmd/lotus-provider/tasks/tasks.go index b12f9691a..dd2b9ef83 100644 --- a/cmd/lotus-provider/tasks/tasks.go +++ b/cmd/lotus-provider/tasks/tasks.go @@ -65,7 +65,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task var sp *lpseal.SealPoller var slr *lpffi.SealCalls if hasAnySealingTask { - sp = lpseal.NewPoller(db) + sp = lpseal.NewPoller(db, full) go sp.RunPoller(ctx) slr = lpffi.NewSealCalls(stor, lstor, si) diff --git a/provider/lpseal/poller.go b/provider/lpseal/poller.go index f1ba84728..343a1e7ae 100644 --- a/provider/lpseal/poller.go +++ b/provider/lpseal/poller.go @@ -2,6 +2,11 @@ package lpseal import ( "context" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/actors/policy" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/promise" @@ -22,15 +27,21 @@ const ( const sealPollerInterval = 10 * time.Second +type SealPollerAPI interface { + StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error) +} + type SealPoller struct { - db *harmonydb.DB + db *harmonydb.DB + api SealPollerAPI pollers [numPollers]promise.Promise[harmonytask.AddTaskFunc] } -func NewPoller(db *harmonydb.DB) *SealPoller { +func NewPoller(db *harmonydb.DB, api SealPollerAPI) *SealPoller { return &SealPoller{ - db: db, + db: db, + api: api, } } @@ -149,12 +160,50 @@ func (s *SealPoller) poll(ctx context.Context) error { }) } - if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsg { + if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsgSuccess { + // join pipeline precommit_msg_cid with message_waits signed_message_cid, if executed_tsk_epoch is not null, return result - } + var execResult []struct { + ExecutedTskCID string `db:"executed_tsk_cid"` + ExecutedTskEpoch int64 `db:"executed_tsk_epoch"` + ExecutedMsgCID string `db:"executed_msg_cid"` - if task.AfterPrecommitMsg { - // todo start precommit msg wait task + ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"` + ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"` + } + + err := s.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used + FROM sectors_sdr_pipeline + JOIN message_waits ON sectors_sdr_pipeline.precommit_msg_cid = message_waits.signed_message_cid + WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber) + if err != nil { + log.Errorw("failed to query message_waits", "error", err) + } + + if len(execResult) > 0 { + maddr, err := address.NewIDAddress(uint64(task.SpID)) + if err != nil { + return err + } + + pci, err := s.api.StateSectorPreCommitInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) + if err != nil { + return xerrors.Errorf("get precommit info: %w", err) + } + + if pci != nil { + randHeight := pci.PreCommitEpoch + policy.GetPreCommitChallengeDelay() + + _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET + seed_epoch = $1, precommit_msg_tsk = $2, after_precommit_msg_success = true + WHERE sp_id = $3 AND sector_number = $4 and seed_epoch is NULL`, + randHeight, execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) + if err != nil { + return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + } // todo handle missing precommit info (eg expired precommit) + + } } todoWaitSeed := false diff --git a/provider/lpseal/task_submit_precommit.go b/provider/lpseal/task_submit_precommit.go index 0adc9f5ab..99b1a0655 100644 --- a/provider/lpseal/task_submit_precommit.go +++ b/provider/lpseal/task_submit_precommit.go @@ -142,7 +142,7 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo // set precommit_msg_cid _, err = s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET precommit_msg_cid = $1 + SET precommit_msg_cid = $1, after_precommit_msg = true WHERE task_id_precommit_msg = $2`, mcid, taskID) if err != nil { return false, xerrors.Errorf("updating precommit_msg_cid: %w", err)