From 8213d7bca9c11e348c7030e7beac2704159fff28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 19 Jan 2024 14:08:05 +0100 Subject: [PATCH] lpseal: Handle out-of-gas retry in poller --- provider/lpseal/README.md | 28 +++++ provider/lpseal/poller.go | 131 ------------------------ provider/lpseal/poller_commit_msg.go | 105 +++++++++++++++++++ provider/lpseal/poller_precommit_msg.go | 116 +++++++++++++++++++++ 4 files changed, 249 insertions(+), 131 deletions(-) create mode 100644 provider/lpseal/README.md create mode 100644 provider/lpseal/poller_commit_msg.go create mode 100644 provider/lpseal/poller_precommit_msg.go diff --git a/provider/lpseal/README.md b/provider/lpseal/README.md new file mode 100644 index 000000000..2f9055f86 --- /dev/null +++ b/provider/lpseal/README.md @@ -0,0 +1,28 @@ +# Lotus-Provider Sealer + +## Overview + +The lotus-provider sealer is a collection of harmony tasks and a common poller +which implement the sealing functionality of the Filecoin protocol. + +## Pipeline Tasks + +* SDR pipeline + * `SDR` - Generate SDR layers + * `SDRTrees` - Generate tree files (TreeD, TreeR, TreeC) + * `PreCommitSubmit` - Submit precommit message to the network + * `PoRep` - Generate PoRep proof + * `CommitSubmit` - Submit commit message to the network + +# Poller + +The poller is a background process running on every node which runs any of the +SDR pipeline tasks. It periodically checks the state of sectors in the SDR pipeline +and schedules any tasks to run which will move the sector along the pipeline. + +# Error Handling + +* Pipeline tasks are expected to always finish successfully as harmonytask tasks. + If a sealing task encounters an error, it should mark the sector pipeline entry + as failed and exit without erroring. The poller will then figure out a recovery + strategy for the sector. diff --git a/provider/lpseal/poller.go b/provider/lpseal/poller.go index a8bf84c2a..a51e7d859 100644 --- a/provider/lpseal/poller.go +++ b/provider/lpseal/poller.go @@ -2,7 +2,6 @@ package lpseal import ( "context" - "github.com/filecoin-project/lotus/chain/actors/policy" "time" logging "github.com/ipfs/go-log/v2" @@ -180,70 +179,6 @@ func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) { } } -func (s *SealPoller) pollStartPrecommitMsg(ctx context.Context, task pollTask) { - if task.TaskPrecommitMsg == nil && task.AfterTreeR && task.AfterTreeD { - s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true`, id, task.SpID, task.SectorNumber) - if err != nil { - return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) - } - if n != 1 { - return false, xerrors.Errorf("expected to update 1 row, updated %d", n) - } - - return true, nil - }) - } -} - -func (s *SealPoller) pollPrecommitMsgLanded(ctx context.Context, task pollTask) error { - if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsgSuccess { - var execResult []struct { - ExecutedTskCID string `db:"executed_tsk_cid"` - ExecutedTskEpoch int64 `db:"executed_tsk_epoch"` - ExecutedMsgCID string `db:"executed_msg_cid"` - - ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"` - ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"` - } - - err := s.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used - FROM sectors_sdr_pipeline - JOIN message_waits ON sectors_sdr_pipeline.precommit_msg_cid = message_waits.signed_message_cid - WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber) - if err != nil { - log.Errorw("failed to query message_waits", "error", err) - } - - if len(execResult) > 0 { - maddr, err := address.NewIDAddress(uint64(task.SpID)) - if err != nil { - return err - } - - pci, err := s.api.StateSectorPreCommitInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) - if err != nil { - return xerrors.Errorf("get precommit info: %w", err) - } - - if pci != nil { - randHeight := pci.PreCommitEpoch + policy.GetPreCommitChallengeDelay() - - _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET - seed_epoch = $1, precommit_msg_tsk = $2, after_precommit_msg_success = true - WHERE sp_id = $3 AND sector_number = $4 and seed_epoch is NULL`, - randHeight, execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) - if err != nil { - return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) - } - } // todo handle missing precommit info (eg expired precommit) - - } - } - - return nil -} - func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *types.TipSet) { if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil && task.TaskPoRep == nil && ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) { s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { @@ -260,72 +195,6 @@ func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *type } } -func (s *SealPoller) pollStartCommitMsg(ctx context.Context, task pollTask) { - if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && s.pollers[pollerCommitMsg].IsSet() { - s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber) - if err != nil { - return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) - } - if n != 1 { - return false, xerrors.Errorf("expected to update 1 row, updated %d", n) - } - - return true, nil - }) - } -} - -func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) error { - if task.AfterCommitMsg && !task.AfterCommitMsgSuccess && s.pollers[pollerCommitMsg].IsSet() { - var execResult []struct { - ExecutedTskCID string `db:"executed_tsk_cid"` - ExecutedTskEpoch int64 `db:"executed_tsk_epoch"` - ExecutedMsgCID string `db:"executed_msg_cid"` - - ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"` - ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"` - } - - err := s.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used - FROM sectors_sdr_pipeline - JOIN message_waits ON sectors_sdr_pipeline.commit_msg_cid = message_waits.signed_message_cid - WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber) - if err != nil { - log.Errorw("failed to query message_waits", "error", err) - } - - if len(execResult) > 0 { - maddr, err := address.NewIDAddress(uint64(task.SpID)) - if err != nil { - return err - } - - si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) - if err != nil { - return xerrors.Errorf("get sector info: %w", err) - } - - if si == nil { - log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID) - // todo handdle missing sector info (not found after cron) - } else { - // yay! - - _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET - after_commit_msg_success = true, commit_msg_tsk = $1 - WHERE sp_id = $2 AND sector_number = $3 and after_commit_msg_success = false`, - execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) - if err != nil { - return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) - } - } - } - } - - return nil -} - func (s *SealPoller) mustPoll(err error) { if err != nil { log.Errorw("poller operation failed", "error", err) diff --git a/provider/lpseal/poller_commit_msg.go b/provider/lpseal/poller_commit_msg.go new file mode 100644 index 000000000..d7bc532c6 --- /dev/null +++ b/provider/lpseal/poller_commit_msg.go @@ -0,0 +1,105 @@ +package lpseal + +import ( + "context" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/exitcode" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "golang.org/x/xerrors" +) + +func (s *SealPoller) pollStartCommitMsg(ctx context.Context, task pollTask) { + if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && s.pollers[pollerCommitMsg].IsSet() { + s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } + + return true, nil + }) + } +} + +func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) error { + if task.AfterCommitMsg && !task.AfterCommitMsgSuccess && s.pollers[pollerCommitMsg].IsSet() { + var execResult []dbExecResult + + err := s.db.Select(ctx, &execResult, `SELECT spipeline.precommit_msg_cid, spipeline.commit_msg_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used + FROM sectors_sdr_pipeline spipeline + JOIN message_waits ON spipeline.commit_msg_cid = message_waits.signed_message_cid + WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber) + if err != nil { + log.Errorw("failed to query message_waits", "error", err) + } + + if len(execResult) > 0 { + maddr, err := address.NewIDAddress(uint64(task.SpID)) + if err != nil { + return err + } + + if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok { + return s.pollCommitMsgFail(ctx, task, execResult[0]) + } + + si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) + if err != nil { + return xerrors.Errorf("get sector info: %w", err) + } + + if si == nil { + log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID) + // todo handdle missing sector info (not found after cron) + } else { + // yay! + + _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET + after_commit_msg_success = true, commit_msg_tsk = $1 + WHERE sp_id = $2 AND sector_number = $3 and after_commit_msg_success = false`, + execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) + if err != nil { + return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + } + } + } + + return nil +} + +func (s *SealPoller) pollCommitMsgFail(ctx context.Context, task pollTask, execResult dbExecResult) error { + switch exitcode.ExitCode(execResult.ExecutedRcptExitCode) { + case exitcode.SysErrInsufficientFunds: + fallthrough + case exitcode.SysErrOutOfGas: + // just retry + return s.pollRetryPrecommitMsgSend(ctx, task, execResult) + default: + return xerrors.Errorf("commit message failed with exit code %s", exitcode.ExitCode(execResult.ExecutedRcptExitCode)) + } +} + +func (s *SealPoller) pollRetryCommitMsgSend(ctx context.Context, task pollTask, execResult dbExecResult) error { + if execResult.CommitMsgCID == nil { + return xerrors.Errorf("commit msg cid was nil") + } + + // make the pipeline entry seem like precommit send didn't happen, next poll loop will retry + + _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET + commit_msg_cid = null, task_id_commit_msg = null + WHERE commit_msg_cid = $1 AND sp_id = $2 AND sector_number = $3 AND after_commit_msg_success = false`, + *execResult.CommitMsgCID, task.SpID, task.SectorNumber) + if err != nil { + return xerrors.Errorf("update sectors_sdr_pipeline to retry precommit msg send: %w", err) + } + + return nil +} diff --git a/provider/lpseal/poller_precommit_msg.go b/provider/lpseal/poller_precommit_msg.go new file mode 100644 index 000000000..b5798966e --- /dev/null +++ b/provider/lpseal/poller_precommit_msg.go @@ -0,0 +1,116 @@ +package lpseal + +import ( + "context" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/exitcode" + "github.com/filecoin-project/lotus/chain/actors/policy" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "golang.org/x/xerrors" +) + +func (s *SealPoller) pollStartPrecommitMsg(ctx context.Context, task pollTask) { + if task.TaskPrecommitMsg == nil && task.AfterTreeR && task.AfterTreeD { + s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true`, id, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } + + return true, nil + }) + } +} + +type dbExecResult struct { + PrecommitMsgCID *string `db:"precommit_msg_cid"` + CommitMsgCID *string `db:"commit_msg_cid"` + + ExecutedTskCID string `db:"executed_tsk_cid"` + ExecutedTskEpoch int64 `db:"executed_tsk_epoch"` + ExecutedMsgCID string `db:"executed_msg_cid"` + + ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"` + ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"` +} + +func (s *SealPoller) pollPrecommitMsgLanded(ctx context.Context, task pollTask) error { + if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsgSuccess { + var execResult []dbExecResult + + err := s.db.Select(ctx, &execResult, `SELECT spipeline.precommit_msg_cid, spipeline.commit_msg_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used + FROM sectors_sdr_pipeline spipeline + JOIN message_waits ON spipeline.precommit_msg_cid = message_waits.signed_message_cid + WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber) + if err != nil { + log.Errorw("failed to query message_waits", "error", err) + } + + if len(execResult) > 0 { + if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok { + return s.pollPrecommitMsgFail(ctx, task, execResult[0]) + } + + maddr, err := address.NewIDAddress(uint64(task.SpID)) + if err != nil { + return err + } + + pci, err := s.api.StateSectorPreCommitInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) + if err != nil { + return xerrors.Errorf("get precommit info: %w", err) + } + + if pci != nil { + randHeight := pci.PreCommitEpoch + policy.GetPreCommitChallengeDelay() + + _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET + seed_epoch = $1, precommit_msg_tsk = $2, after_precommit_msg_success = true + WHERE sp_id = $3 AND sector_number = $4 and seed_epoch is NULL`, + randHeight, execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) + if err != nil { + return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + } // todo handle missing precommit info (eg expired precommit) + + } + } + + return nil +} + +func (s *SealPoller) pollPrecommitMsgFail(ctx context.Context, task pollTask, execResult dbExecResult) error { + switch exitcode.ExitCode(execResult.ExecutedRcptExitCode) { + case exitcode.SysErrInsufficientFunds: + fallthrough + case exitcode.SysErrOutOfGas: + // just retry + return s.pollRetryPrecommitMsgSend(ctx, task, execResult) + default: + return xerrors.Errorf("precommit message failed with exit code %s", exitcode.ExitCode(execResult.ExecutedRcptExitCode)) + } +} + +func (s *SealPoller) pollRetryPrecommitMsgSend(ctx context.Context, task pollTask, execResult dbExecResult) error { + if execResult.PrecommitMsgCID == nil { + return xerrors.Errorf("precommit msg cid was nil") + } + + // make the pipeline entry seem like precommit send didn't happen, next poll loop will retry + + _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET + precommit_msg_cid = null, task_id_precommit_msg = null + WHERE precommit_msg_cid = $1 AND sp_id = $2 AND sector_number = $3 AND after_precommit_msg_success = false`, + *execResult.PrecommitMsgCID, task.SpID, task.SectorNumber) + if err != nil { + return xerrors.Errorf("update sectors_sdr_pipeline to retry precommit msg send: %w", err) + } + + return nil +}