From fabd413e5e97f928985214d55ca54ca7c584dbb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 17 Jan 2024 12:42:43 +0100 Subject: [PATCH] lpseal: clean up poller loop --- provider/lpseal/poller.go | 389 +++++++++++++++++++++----------------- 1 file changed, 212 insertions(+), 177 deletions(-) diff --git a/provider/lpseal/poller.go b/provider/lpseal/poller.go index 26a7e315b..a8bf84c2a 100644 --- a/provider/lpseal/poller.go +++ b/provider/lpseal/poller.go @@ -2,6 +2,7 @@ package lpseal import ( "context" + "github.com/filecoin-project/lotus/chain/actors/policy" "time" logging "github.com/ipfs/go-log/v2" @@ -11,7 +12,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" @@ -69,41 +69,43 @@ func (s *SealPoller) RunPoller(ctx context.Context) { } } +type pollTask struct { + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + + TaskSDR *int64 `db:"task_id_sdr"` + AfterSDR bool `db:"after_sdr"` + + TaskTreeD *int64 `db:"task_id_tree_d"` + AfterTreeD bool `db:"after_tree_d"` + + TaskTreeC *int64 `db:"task_id_tree_c"` + AfterTreeC bool `db:"after_tree_c"` + + TaskTreeR *int64 `db:"task_id_tree_r"` + AfterTreeR bool `db:"after_tree_r"` + + TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` + AfterPrecommitMsg bool `db:"after_precommit_msg"` + + AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` + SeedEpoch *int64 `db:"seed_epoch"` + + TaskPoRep *int64 `db:"task_id_porep"` + PoRepProof []byte `db:"porep_proof"` + AfterPoRep bool `db:"after_porep"` + + TaskCommitMsg *int64 `db:"task_id_commit_msg"` + AfterCommitMsg bool `db:"after_commit_msg"` + + AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` + + Failed bool `db:"failed"` + FailedReason string `db:"failed_reason"` +} + func (s *SealPoller) poll(ctx context.Context) error { - var tasks []struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` - - TaskSDR *int64 `db:"task_id_sdr"` - AfterSDR bool `db:"after_sdr"` - - TaskTreeD *int64 `db:"task_id_tree_d"` - AfterTreeD bool `db:"after_tree_d"` - - TaskTreeC *int64 `db:"task_id_tree_c"` - AfterTreeC bool `db:"after_tree_c"` - - TaskTreeR *int64 `db:"task_id_tree_r"` - AfterTreeR bool `db:"after_tree_r"` - - TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` - AfterPrecommitMsg bool `db:"after_precommit_msg"` - - AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` - SeedEpoch *int64 `db:"seed_epoch"` - - TaskPoRep *int64 `db:"task_id_porep"` - PoRepProof []byte `db:"porep_proof"` - AfterPoRep bool `db:"after_porep"` - - TaskCommitMsg *int64 `db:"task_id_commit_msg"` - AfterCommitMsg bool `db:"after_commit_msg"` - - AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` - - Failed bool `db:"failed"` - FailedReason string `db:"failed_reason"` - } + var tasks []pollTask err := s.db.Select(ctx, &tasks, `SELECT sp_id, sector_number, @@ -128,167 +130,194 @@ func (s *SealPoller) poll(ctx context.Context) error { continue } - if task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() { - s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_sdr is null`, id, task.SpID, task.SectorNumber) - if err != nil { - return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) - } - if n != 1 { - return false, xerrors.Errorf("expected to update 1 row, updated %d", n) - } - - return true, nil - }) - } - if task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil && s.pollers[pollerTrees].IsSet() && task.AfterSDR { - s.pollers[pollerTrees].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1, task_id_tree_c = $1, task_id_tree_r = $1 - WHERE sp_id = $2 AND sector_number = $3 and after_sdr = true and task_id_tree_d is null and task_id_tree_c is null and task_id_tree_r is null`, id, task.SpID, task.SectorNumber) - if err != nil { - return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) - } - if n != 1 { - return false, xerrors.Errorf("expected to update 1 row, updated %d", n) - } - - return true, nil - }) - } - - if task.TaskPrecommitMsg == nil && task.AfterTreeR && task.AfterTreeD { - s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true`, id, task.SpID, task.SectorNumber) - if err != nil { - return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) - } - if n != 1 { - return false, xerrors.Errorf("expected to update 1 row, updated %d", n) - } - - return true, nil - }) - } - - if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsgSuccess { - var execResult []struct { - ExecutedTskCID string `db:"executed_tsk_cid"` - ExecutedTskEpoch int64 `db:"executed_tsk_epoch"` - ExecutedMsgCID string `db:"executed_msg_cid"` - - ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"` - ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"` - } - - err := s.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used - FROM sectors_sdr_pipeline - JOIN message_waits ON sectors_sdr_pipeline.precommit_msg_cid = message_waits.signed_message_cid - WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber) - if err != nil { - log.Errorw("failed to query message_waits", "error", err) - } - - if len(execResult) > 0 { - maddr, err := address.NewIDAddress(uint64(task.SpID)) - if err != nil { - return err - } - - pci, err := s.api.StateSectorPreCommitInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) - if err != nil { - return xerrors.Errorf("get precommit info: %w", err) - } - - if pci != nil { - randHeight := pci.PreCommitEpoch + policy.GetPreCommitChallengeDelay() - - _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET - seed_epoch = $1, precommit_msg_tsk = $2, after_precommit_msg_success = true - WHERE sp_id = $3 AND sector_number = $4 and seed_epoch is NULL`, - randHeight, execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) - if err != nil { - return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) - } - } // todo handle missing precommit info (eg expired precommit) - - } - } - ts, err := s.api.ChainHead(ctx) if err != nil { return xerrors.Errorf("getting chain head: %w", err) } - if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil && task.TaskPoRep == nil && ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) { - s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_porep = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_porep is null`, id, task.SpID, task.SectorNumber) - if err != nil { - return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) - } - if n != 1 { - return false, xerrors.Errorf("expected to update 1 row, updated %d", n) - } + s.pollStartSDR(ctx, task) + s.pollStartSDRTrees(ctx, task) + s.pollStartPrecommitMsg(ctx, task) + s.mustPoll(s.pollPrecommitMsgLanded(ctx, task)) + s.pollStartPoRep(ctx, task, ts) + s.pollStartCommitMsg(ctx, task) + s.mustPoll(s.pollCommitMsgLanded(ctx, task)) + } - return true, nil - }) - } + return nil +} - if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && s.pollers[pollerCommitMsg].IsSet() { - s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber) - if err != nil { - return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) - } - if n != 1 { - return false, xerrors.Errorf("expected to update 1 row, updated %d", n) - } - - return true, nil - }) - } - - if task.AfterCommitMsg && !task.AfterCommitMsgSuccess && s.pollers[pollerCommitMsg].IsSet() { - var execResult []struct { - ExecutedTskCID string `db:"executed_tsk_cid"` - ExecutedTskEpoch int64 `db:"executed_tsk_epoch"` - ExecutedMsgCID string `db:"executed_msg_cid"` - - ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"` - ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"` +func (s *SealPoller) pollStartSDR(ctx context.Context, task pollTask) { + if task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() { + s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_sdr is null`, id, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) } - err := s.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used + return true, nil + }) + } +} + +func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) { + if task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil && s.pollers[pollerTrees].IsSet() && task.AfterSDR { + s.pollers[pollerTrees].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1, task_id_tree_c = $1, task_id_tree_r = $1 + WHERE sp_id = $2 AND sector_number = $3 and after_sdr = true and task_id_tree_d is null and task_id_tree_c is null and task_id_tree_r is null`, id, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } + + return true, nil + }) + } +} + +func (s *SealPoller) pollStartPrecommitMsg(ctx context.Context, task pollTask) { + if task.TaskPrecommitMsg == nil && task.AfterTreeR && task.AfterTreeD { + s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true`, id, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } + + return true, nil + }) + } +} + +func (s *SealPoller) pollPrecommitMsgLanded(ctx context.Context, task pollTask) error { + if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsgSuccess { + var execResult []struct { + ExecutedTskCID string `db:"executed_tsk_cid"` + ExecutedTskEpoch int64 `db:"executed_tsk_epoch"` + ExecutedMsgCID string `db:"executed_msg_cid"` + + ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"` + ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"` + } + + err := s.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used + FROM sectors_sdr_pipeline + JOIN message_waits ON sectors_sdr_pipeline.precommit_msg_cid = message_waits.signed_message_cid + WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber) + if err != nil { + log.Errorw("failed to query message_waits", "error", err) + } + + if len(execResult) > 0 { + maddr, err := address.NewIDAddress(uint64(task.SpID)) + if err != nil { + return err + } + + pci, err := s.api.StateSectorPreCommitInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) + if err != nil { + return xerrors.Errorf("get precommit info: %w", err) + } + + if pci != nil { + randHeight := pci.PreCommitEpoch + policy.GetPreCommitChallengeDelay() + + _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET + seed_epoch = $1, precommit_msg_tsk = $2, after_precommit_msg_success = true + WHERE sp_id = $3 AND sector_number = $4 and seed_epoch is NULL`, + randHeight, execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) + if err != nil { + return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + } // todo handle missing precommit info (eg expired precommit) + + } + } + + return nil +} + +func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *types.TipSet) { + if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil && task.TaskPoRep == nil && ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) { + s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_porep = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_porep is null`, id, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } + + return true, nil + }) + } +} + +func (s *SealPoller) pollStartCommitMsg(ctx context.Context, task pollTask) { + if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && s.pollers[pollerCommitMsg].IsSet() { + s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } + + return true, nil + }) + } +} + +func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) error { + if task.AfterCommitMsg && !task.AfterCommitMsgSuccess && s.pollers[pollerCommitMsg].IsSet() { + var execResult []struct { + ExecutedTskCID string `db:"executed_tsk_cid"` + ExecutedTskEpoch int64 `db:"executed_tsk_epoch"` + ExecutedMsgCID string `db:"executed_msg_cid"` + + ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"` + ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"` + } + + err := s.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used FROM sectors_sdr_pipeline JOIN message_waits ON sectors_sdr_pipeline.commit_msg_cid = message_waits.signed_message_cid WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber) + if err != nil { + log.Errorw("failed to query message_waits", "error", err) + } + + if len(execResult) > 0 { + maddr, err := address.NewIDAddress(uint64(task.SpID)) if err != nil { - log.Errorw("failed to query message_waits", "error", err) + return err } - if len(execResult) > 0 { - maddr, err := address.NewIDAddress(uint64(task.SpID)) - if err != nil { - return err - } + si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) + if err != nil { + return xerrors.Errorf("get sector info: %w", err) + } - si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) - if err != nil { - return xerrors.Errorf("get sector info: %w", err) - } + if si == nil { + log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID) + // todo handdle missing sector info (not found after cron) + } else { + // yay! - if si == nil { - log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID) - // todo handdle missing sector info (not found after cron) - } else { - // yay! - - _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET + _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET after_commit_msg_success = true, commit_msg_tsk = $1 WHERE sp_id = $2 AND sector_number = $3 and after_commit_msg_success = false`, - execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) - if err != nil { - return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) - } + execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) + if err != nil { + return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) } } } @@ -296,3 +325,9 @@ func (s *SealPoller) poll(ctx context.Context) error { return nil } + +func (s *SealPoller) mustPoll(err error) { + if err != nil { + log.Errorw("poller operation failed", "error", err) + } +}