lpseal: Handle out-of-gas retry in poller

This commit is contained in:
Łukasz Magiera 2024-01-19 14:08:05 +01:00
parent fabd413e5e
commit 8213d7bca9
4 changed files with 249 additions and 131 deletions

28
provider/lpseal/README.md Normal file
View File

@ -0,0 +1,28 @@
# Lotus-Provider Sealer
## Overview
The lotus-provider sealer is a collection of harmony tasks and a common poller
which implement the sealing functionality of the Filecoin protocol.
## Pipeline Tasks
* SDR pipeline
* `SDR` - Generate SDR layers
* `SDRTrees` - Generate tree files (TreeD, TreeR, TreeC)
* `PreCommitSubmit` - Submit precommit message to the network
* `PoRep` - Generate PoRep proof
* `CommitSubmit` - Submit commit message to the network
# Poller
The poller is a background process running on every node which runs any of the
SDR pipeline tasks. It periodically checks the state of sectors in the SDR pipeline
and schedules any tasks to run which will move the sector along the pipeline.
# Error Handling
* Pipeline tasks are expected to always finish successfully as harmonytask tasks.
If a sealing task encounters an error, it should mark the sector pipeline entry
as failed and exit without erroring. The poller will then figure out a recovery
strategy for the sector.

View File

@ -2,7 +2,6 @@ package lpseal
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/chain/actors/policy"
"time" "time"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
@ -180,70 +179,6 @@ func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) {
} }
} }
func (s *SealPoller) pollStartPrecommitMsg(ctx context.Context, task pollTask) {
if task.TaskPrecommitMsg == nil && task.AfterTreeR && task.AfterTreeD {
s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true`, id, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
}
return true, nil
})
}
}
func (s *SealPoller) pollPrecommitMsgLanded(ctx context.Context, task pollTask) error {
if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsgSuccess {
var execResult []struct {
ExecutedTskCID string `db:"executed_tsk_cid"`
ExecutedTskEpoch int64 `db:"executed_tsk_epoch"`
ExecutedMsgCID string `db:"executed_msg_cid"`
ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"`
ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"`
}
err := s.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
FROM sectors_sdr_pipeline
JOIN message_waits ON sectors_sdr_pipeline.precommit_msg_cid = message_waits.signed_message_cid
WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber)
if err != nil {
log.Errorw("failed to query message_waits", "error", err)
}
if len(execResult) > 0 {
maddr, err := address.NewIDAddress(uint64(task.SpID))
if err != nil {
return err
}
pci, err := s.api.StateSectorPreCommitInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
if err != nil {
return xerrors.Errorf("get precommit info: %w", err)
}
if pci != nil {
randHeight := pci.PreCommitEpoch + policy.GetPreCommitChallengeDelay()
_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
seed_epoch = $1, precommit_msg_tsk = $2, after_precommit_msg_success = true
WHERE sp_id = $3 AND sector_number = $4 and seed_epoch is NULL`,
randHeight, execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
} // todo handle missing precommit info (eg expired precommit)
}
}
return nil
}
func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *types.TipSet) { func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *types.TipSet) {
if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil && task.TaskPoRep == nil && ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) { if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil && task.TaskPoRep == nil && ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) {
s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
@ -260,72 +195,6 @@ func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *type
} }
} }
func (s *SealPoller) pollStartCommitMsg(ctx context.Context, task pollTask) {
if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && s.pollers[pollerCommitMsg].IsSet() {
s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
}
return true, nil
})
}
}
func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) error {
if task.AfterCommitMsg && !task.AfterCommitMsgSuccess && s.pollers[pollerCommitMsg].IsSet() {
var execResult []struct {
ExecutedTskCID string `db:"executed_tsk_cid"`
ExecutedTskEpoch int64 `db:"executed_tsk_epoch"`
ExecutedMsgCID string `db:"executed_msg_cid"`
ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"`
ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"`
}
err := s.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
FROM sectors_sdr_pipeline
JOIN message_waits ON sectors_sdr_pipeline.commit_msg_cid = message_waits.signed_message_cid
WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber)
if err != nil {
log.Errorw("failed to query message_waits", "error", err)
}
if len(execResult) > 0 {
maddr, err := address.NewIDAddress(uint64(task.SpID))
if err != nil {
return err
}
si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
if err != nil {
return xerrors.Errorf("get sector info: %w", err)
}
if si == nil {
log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID)
// todo handdle missing sector info (not found after cron)
} else {
// yay!
_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
after_commit_msg_success = true, commit_msg_tsk = $1
WHERE sp_id = $2 AND sector_number = $3 and after_commit_msg_success = false`,
execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
}
}
}
return nil
}
func (s *SealPoller) mustPoll(err error) { func (s *SealPoller) mustPoll(err error) {
if err != nil { if err != nil {
log.Errorw("poller operation failed", "error", err) log.Errorw("poller operation failed", "error", err)

View File

@ -0,0 +1,105 @@
package lpseal
import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"golang.org/x/xerrors"
)
func (s *SealPoller) pollStartCommitMsg(ctx context.Context, task pollTask) {
if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && s.pollers[pollerCommitMsg].IsSet() {
s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
}
return true, nil
})
}
}
func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) error {
if task.AfterCommitMsg && !task.AfterCommitMsgSuccess && s.pollers[pollerCommitMsg].IsSet() {
var execResult []dbExecResult
err := s.db.Select(ctx, &execResult, `SELECT spipeline.precommit_msg_cid, spipeline.commit_msg_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
FROM sectors_sdr_pipeline spipeline
JOIN message_waits ON spipeline.commit_msg_cid = message_waits.signed_message_cid
WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber)
if err != nil {
log.Errorw("failed to query message_waits", "error", err)
}
if len(execResult) > 0 {
maddr, err := address.NewIDAddress(uint64(task.SpID))
if err != nil {
return err
}
if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok {
return s.pollCommitMsgFail(ctx, task, execResult[0])
}
si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
if err != nil {
return xerrors.Errorf("get sector info: %w", err)
}
if si == nil {
log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID)
// todo handdle missing sector info (not found after cron)
} else {
// yay!
_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
after_commit_msg_success = true, commit_msg_tsk = $1
WHERE sp_id = $2 AND sector_number = $3 and after_commit_msg_success = false`,
execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
}
}
}
return nil
}
func (s *SealPoller) pollCommitMsgFail(ctx context.Context, task pollTask, execResult dbExecResult) error {
switch exitcode.ExitCode(execResult.ExecutedRcptExitCode) {
case exitcode.SysErrInsufficientFunds:
fallthrough
case exitcode.SysErrOutOfGas:
// just retry
return s.pollRetryPrecommitMsgSend(ctx, task, execResult)
default:
return xerrors.Errorf("commit message failed with exit code %s", exitcode.ExitCode(execResult.ExecutedRcptExitCode))
}
}
func (s *SealPoller) pollRetryCommitMsgSend(ctx context.Context, task pollTask, execResult dbExecResult) error {
if execResult.CommitMsgCID == nil {
return xerrors.Errorf("commit msg cid was nil")
}
// make the pipeline entry seem like precommit send didn't happen, next poll loop will retry
_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
commit_msg_cid = null, task_id_commit_msg = null
WHERE commit_msg_cid = $1 AND sp_id = $2 AND sector_number = $3 AND after_commit_msg_success = false`,
*execResult.CommitMsgCID, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update sectors_sdr_pipeline to retry precommit msg send: %w", err)
}
return nil
}

View File

@ -0,0 +1,116 @@
package lpseal
import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"golang.org/x/xerrors"
)
func (s *SealPoller) pollStartPrecommitMsg(ctx context.Context, task pollTask) {
if task.TaskPrecommitMsg == nil && task.AfterTreeR && task.AfterTreeD {
s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true`, id, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
}
return true, nil
})
}
}
type dbExecResult struct {
PrecommitMsgCID *string `db:"precommit_msg_cid"`
CommitMsgCID *string `db:"commit_msg_cid"`
ExecutedTskCID string `db:"executed_tsk_cid"`
ExecutedTskEpoch int64 `db:"executed_tsk_epoch"`
ExecutedMsgCID string `db:"executed_msg_cid"`
ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"`
ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"`
}
func (s *SealPoller) pollPrecommitMsgLanded(ctx context.Context, task pollTask) error {
if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsgSuccess {
var execResult []dbExecResult
err := s.db.Select(ctx, &execResult, `SELECT spipeline.precommit_msg_cid, spipeline.commit_msg_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
FROM sectors_sdr_pipeline spipeline
JOIN message_waits ON spipeline.precommit_msg_cid = message_waits.signed_message_cid
WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber)
if err != nil {
log.Errorw("failed to query message_waits", "error", err)
}
if len(execResult) > 0 {
if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok {
return s.pollPrecommitMsgFail(ctx, task, execResult[0])
}
maddr, err := address.NewIDAddress(uint64(task.SpID))
if err != nil {
return err
}
pci, err := s.api.StateSectorPreCommitInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
if err != nil {
return xerrors.Errorf("get precommit info: %w", err)
}
if pci != nil {
randHeight := pci.PreCommitEpoch + policy.GetPreCommitChallengeDelay()
_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
seed_epoch = $1, precommit_msg_tsk = $2, after_precommit_msg_success = true
WHERE sp_id = $3 AND sector_number = $4 and seed_epoch is NULL`,
randHeight, execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
} // todo handle missing precommit info (eg expired precommit)
}
}
return nil
}
func (s *SealPoller) pollPrecommitMsgFail(ctx context.Context, task pollTask, execResult dbExecResult) error {
switch exitcode.ExitCode(execResult.ExecutedRcptExitCode) {
case exitcode.SysErrInsufficientFunds:
fallthrough
case exitcode.SysErrOutOfGas:
// just retry
return s.pollRetryPrecommitMsgSend(ctx, task, execResult)
default:
return xerrors.Errorf("precommit message failed with exit code %s", exitcode.ExitCode(execResult.ExecutedRcptExitCode))
}
}
func (s *SealPoller) pollRetryPrecommitMsgSend(ctx context.Context, task pollTask, execResult dbExecResult) error {
if execResult.PrecommitMsgCID == nil {
return xerrors.Errorf("precommit msg cid was nil")
}
// make the pipeline entry seem like precommit send didn't happen, next poll loop will retry
_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
precommit_msg_cid = null, task_id_precommit_msg = null
WHERE precommit_msg_cid = $1 AND sp_id = $2 AND sector_number = $3 AND after_precommit_msg_success = false`,
*execResult.PrecommitMsgCID, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update sectors_sdr_pipeline to retry precommit msg send: %w", err)
}
return nil
}