lpseal: clean up poller loop

This commit is contained in:
Łukasz Magiera 2024-01-17 12:42:43 +01:00
parent af79813af8
commit fabd413e5e

View File

@ -2,6 +2,7 @@ package lpseal
import (
"context"
"github.com/filecoin-project/lotus/chain/actors/policy"
"time"
logging "github.com/ipfs/go-log/v2"
@ -11,7 +12,6 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
@ -69,8 +69,7 @@ func (s *SealPoller) RunPoller(ctx context.Context) {
}
}
func (s *SealPoller) poll(ctx context.Context) error {
var tasks []struct {
type pollTask struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
@ -105,6 +104,9 @@ func (s *SealPoller) poll(ctx context.Context) error {
FailedReason string `db:"failed_reason"`
}
func (s *SealPoller) poll(ctx context.Context) error {
var tasks []pollTask
err := s.db.Select(ctx, &tasks, `SELECT
sp_id, sector_number,
task_id_sdr, after_sdr,
@ -128,6 +130,24 @@ func (s *SealPoller) poll(ctx context.Context) error {
continue
}
ts, err := s.api.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("getting chain head: %w", err)
}
s.pollStartSDR(ctx, task)
s.pollStartSDRTrees(ctx, task)
s.pollStartPrecommitMsg(ctx, task)
s.mustPoll(s.pollPrecommitMsgLanded(ctx, task))
s.pollStartPoRep(ctx, task, ts)
s.pollStartCommitMsg(ctx, task)
s.mustPoll(s.pollCommitMsgLanded(ctx, task))
}
return nil
}
func (s *SealPoller) pollStartSDR(ctx context.Context, task pollTask) {
if task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() {
s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_sdr is null`, id, task.SpID, task.SectorNumber)
@ -141,6 +161,9 @@ func (s *SealPoller) poll(ctx context.Context) error {
return true, nil
})
}
}
func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) {
if task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil && s.pollers[pollerTrees].IsSet() && task.AfterSDR {
s.pollers[pollerTrees].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1, task_id_tree_c = $1, task_id_tree_r = $1
@ -155,7 +178,9 @@ func (s *SealPoller) poll(ctx context.Context) error {
return true, nil
})
}
}
func (s *SealPoller) pollStartPrecommitMsg(ctx context.Context, task pollTask) {
if task.TaskPrecommitMsg == nil && task.AfterTreeR && task.AfterTreeD {
s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true`, id, task.SpID, task.SectorNumber)
@ -169,7 +194,9 @@ func (s *SealPoller) poll(ctx context.Context) error {
return true, nil
})
}
}
func (s *SealPoller) pollPrecommitMsgLanded(ctx context.Context, task pollTask) error {
if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsgSuccess {
var execResult []struct {
ExecutedTskCID string `db:"executed_tsk_cid"`
@ -214,11 +241,10 @@ func (s *SealPoller) poll(ctx context.Context) error {
}
}
ts, err := s.api.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("getting chain head: %w", err)
return nil
}
func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *types.TipSet) {
if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil && task.TaskPoRep == nil && ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) {
s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_porep = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_porep is null`, id, task.SpID, task.SectorNumber)
@ -232,7 +258,9 @@ func (s *SealPoller) poll(ctx context.Context) error {
return true, nil
})
}
}
func (s *SealPoller) pollStartCommitMsg(ctx context.Context, task pollTask) {
if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && s.pollers[pollerCommitMsg].IsSet() {
s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber)
@ -246,7 +274,9 @@ func (s *SealPoller) poll(ctx context.Context) error {
return true, nil
})
}
}
func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) error {
if task.AfterCommitMsg && !task.AfterCommitMsgSuccess && s.pollers[pollerCommitMsg].IsSet() {
var execResult []struct {
ExecutedTskCID string `db:"executed_tsk_cid"`
@ -292,7 +322,12 @@ func (s *SealPoller) poll(ctx context.Context) error {
}
}
}
}
return nil
}
func (s *SealPoller) mustPoll(err error) {
if err != nil {
log.Errorw("poller operation failed", "error", err)
}
}