lotus/provider/lpseal/poller.go

298 lines
10 KiB
Go
Raw Normal View History

2023-12-18 22:48:49 +00:00
package lpseal
import (
"context"
2024-01-12 10:03:37 +00:00
"time"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
2024-01-05 21:05:50 +00:00
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
2024-01-12 10:03:37 +00:00
2024-01-05 21:05:50 +00:00
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
2023-12-18 22:48:49 +00:00
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/promise"
)
var log = logging.Logger("lpseal")
const (
pollerSDR = iota
2023-12-20 10:28:06 +00:00
pollerTrees
2023-12-25 17:35:25 +00:00
pollerPrecommitMsg
2024-01-11 15:47:07 +00:00
pollerPoRep
2024-01-11 18:24:28 +00:00
pollerCommitMsg
2023-12-18 22:48:49 +00:00
numPollers
)
const sealPollerInterval = 10 * time.Second
2024-01-11 21:39:42 +00:00
const seedEpochConfidence = 3
2023-12-18 22:48:49 +00:00
2024-01-05 21:05:50 +00:00
type SealPollerAPI interface {
StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error)
2024-01-11 21:39:42 +00:00
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
ChainHead(context.Context) (*types.TipSet, error)
2024-01-05 21:05:50 +00:00
}
2023-12-18 22:48:49 +00:00
type SealPoller struct {
2024-01-05 21:05:50 +00:00
db *harmonydb.DB
api SealPollerAPI
2023-12-18 22:48:49 +00:00
pollers [numPollers]promise.Promise[harmonytask.AddTaskFunc]
}
2024-01-05 21:05:50 +00:00
func NewPoller(db *harmonydb.DB, api SealPollerAPI) *SealPoller {
2023-12-19 11:16:38 +00:00
return &SealPoller{
2024-01-05 21:05:50 +00:00
db: db,
api: api,
2023-12-19 11:16:38 +00:00
}
}
func (s *SealPoller) RunPoller(ctx context.Context) {
2023-12-18 22:48:49 +00:00
ticker := time.NewTicker(sealPollerInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
2023-12-19 11:16:38 +00:00
return
2023-12-18 22:48:49 +00:00
case <-ticker.C:
2023-12-19 11:16:38 +00:00
if err := s.poll(ctx); err != nil {
2023-12-19 11:39:25 +00:00
log.Errorw("polling failed", "error", err)
2023-12-19 11:16:38 +00:00
}
2023-12-18 22:48:49 +00:00
}
}
}
func (s *SealPoller) poll(ctx context.Context) error {
var tasks []struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
TaskSDR *int64 `db:"task_id_sdr"`
AfterSDR bool `db:"after_sdr"`
TaskTreeD *int64 `db:"task_id_tree_d"`
AfterTreeD bool `db:"after_tree_d"`
TaskTreeC *int64 `db:"task_id_tree_c"`
AfterTreeC bool `db:"after_tree_c"`
TaskTreeR *int64 `db:"task_id_tree_r"`
AfterTreeR bool `db:"after_tree_r"`
TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"`
AfterPrecommitMsg bool `db:"after_precommit_msg"`
2024-01-11 21:39:42 +00:00
AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"`
SeedEpoch *int64 `db:"seed_epoch"`
2023-12-18 22:48:49 +00:00
TaskPoRep *int64 `db:"task_id_porep"`
PoRepProof []byte `db:"porep_proof"`
2024-01-11 21:39:42 +00:00
AfterPoRep bool `db:"after_porep"`
2023-12-18 22:48:49 +00:00
TaskCommitMsg *int64 `db:"task_id_commit_msg"`
AfterCommitMsg bool `db:"after_commit_msg"`
2024-01-11 21:39:42 +00:00
AfterCommitMsgSuccess bool `db:"after_commit_msg_success"`
2023-12-18 22:48:49 +00:00
Failed bool `db:"failed"`
FailedReason string `db:"failed_reason"`
}
2023-12-19 11:39:25 +00:00
err := s.db.Select(ctx, &tasks, `SELECT
sp_id, sector_number,
task_id_sdr, after_sdr,
task_id_tree_d, after_tree_d,
task_id_tree_c, after_tree_c,
task_id_tree_r, after_tree_r,
task_id_precommit_msg, after_precommit_msg,
2024-01-11 21:39:42 +00:00
after_precommit_msg_success, seed_epoch,
task_id_porep, porep_proof, after_porep,
2023-12-19 11:39:25 +00:00
task_id_commit_msg, after_commit_msg,
2024-01-11 21:39:42 +00:00
after_commit_msg_success,
2023-12-19 11:39:25 +00:00
failed, failed_reason
FROM sectors_sdr_pipeline WHERE after_commit_msg_success != true`)
2023-12-18 22:48:49 +00:00
if err != nil {
return err
}
for _, task := range tasks {
if task.Failed {
continue
}
2023-12-19 11:16:38 +00:00
if task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() {
2023-12-18 22:48:49 +00:00
s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
2023-12-20 13:45:19 +00:00
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_sdr is null`, id, task.SpID, task.SectorNumber)
2023-12-18 22:48:49 +00:00
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
}
return true, nil
})
}
2023-12-20 13:45:19 +00:00
if task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil && s.pollers[pollerTrees].IsSet() && task.AfterSDR {
s.pollers[pollerTrees].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1, task_id_tree_c = $1, task_id_tree_r = $1
2023-12-25 17:35:25 +00:00
WHERE sp_id = $2 AND sector_number = $3 and after_sdr = true and task_id_tree_d is null and task_id_tree_c is null and task_id_tree_r is null`, id, task.SpID, task.SectorNumber)
2023-12-20 13:45:19 +00:00
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
}
2023-12-18 22:48:49 +00:00
2023-12-20 13:45:19 +00:00
return true, nil
})
2023-12-18 22:48:49 +00:00
}
if task.TaskPrecommitMsg == nil && task.AfterTreeR && task.AfterTreeD {
2023-12-25 17:35:25 +00:00
s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true`, id, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
}
return true, nil
})
2023-12-18 22:48:49 +00:00
}
2024-01-05 21:05:50 +00:00
if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsgSuccess {
var execResult []struct {
ExecutedTskCID string `db:"executed_tsk_cid"`
ExecutedTskEpoch int64 `db:"executed_tsk_epoch"`
ExecutedMsgCID string `db:"executed_msg_cid"`
ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"`
ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"`
}
err := s.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
FROM sectors_sdr_pipeline
JOIN message_waits ON sectors_sdr_pipeline.precommit_msg_cid = message_waits.signed_message_cid
WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber)
if err != nil {
log.Errorw("failed to query message_waits", "error", err)
}
if len(execResult) > 0 {
maddr, err := address.NewIDAddress(uint64(task.SpID))
if err != nil {
return err
}
pci, err := s.api.StateSectorPreCommitInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
if err != nil {
return xerrors.Errorf("get precommit info: %w", err)
}
if pci != nil {
randHeight := pci.PreCommitEpoch + policy.GetPreCommitChallengeDelay()
2024-01-05 15:10:34 +00:00
2024-01-05 21:05:50 +00:00
_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
seed_epoch = $1, precommit_msg_tsk = $2, after_precommit_msg_success = true
WHERE sp_id = $3 AND sector_number = $4 and seed_epoch is NULL`,
randHeight, execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
} // todo handle missing precommit info (eg expired precommit)
}
2023-12-18 22:48:49 +00:00
}
2024-01-11 21:39:42 +00:00
ts, err := s.api.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("getting chain head: %w", err)
}
if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil && task.TaskPoRep == nil && ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) {
s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_porep = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_porep is null`, id, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
}
return true, nil
})
}
2024-01-11 22:55:26 +00:00
if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && s.pollers[pollerCommitMsg].IsSet() {
2024-01-11 21:39:42 +00:00
s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
}
return true, nil
})
}
2024-01-11 22:55:26 +00:00
if task.AfterCommitMsg && !task.AfterCommitMsgSuccess && s.pollers[pollerCommitMsg].IsSet() {
2024-01-11 21:39:42 +00:00
var execResult []struct {
ExecutedTskCID string `db:"executed_tsk_cid"`
ExecutedTskEpoch int64 `db:"executed_tsk_epoch"`
ExecutedMsgCID string `db:"executed_msg_cid"`
ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"`
ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"`
}
err := s.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
FROM sectors_sdr_pipeline
JOIN message_waits ON sectors_sdr_pipeline.commit_msg_cid = message_waits.signed_message_cid
WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber)
if err != nil {
log.Errorw("failed to query message_waits", "error", err)
}
if len(execResult) > 0 {
maddr, err := address.NewIDAddress(uint64(task.SpID))
if err != nil {
return err
}
si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
if err != nil {
return xerrors.Errorf("get sector info: %w", err)
}
2024-01-11 22:55:26 +00:00
if si == nil {
log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID)
2024-01-11 21:39:42 +00:00
// todo handdle missing sector info (not found after cron)
} else {
// yay!
_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
after_commit_msg_success = true, commit_msg_tsk = $1
2024-01-11 22:55:26 +00:00
WHERE sp_id = $2 AND sector_number = $3 and after_commit_msg_success = false`,
2024-01-11 21:39:42 +00:00
execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
}
}
2023-12-18 22:48:49 +00:00
}
}
return nil
}