From 1f5013b607bb77480e383cf7f34638f4f6e97632 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 17 Feb 2024 15:01:59 +0100 Subject: [PATCH] lpseal: Allow broken dep chains in poller --- provider/lpseal/poller.go | 30 ++++++++++++++++++++++--- provider/lpseal/poller_commit_msg.go | 2 +- provider/lpseal/poller_precommit_msg.go | 2 +- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/provider/lpseal/poller.go b/provider/lpseal/poller.go index d42189cbe..2cf3c72a6 100644 --- a/provider/lpseal/poller.go +++ b/provider/lpseal/poller.go @@ -183,6 +183,10 @@ func (s *SealPoller) pollStartSDR(ctx context.Context, task pollTask) { } } +func (t pollTask) afterSDR() bool { + return t.AfterSDR +} + func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) { if !task.AfterTreeD && !task.AfterTreeC && !task.AfterTreeR && task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil && @@ -203,8 +207,20 @@ func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) { } } +func (t pollTask) afterTrees() bool { + return t.AfterTreeD && t.AfterTreeC && t.AfterTreeR && t.afterSDR() +} + +func (t pollTask) afterPrecommitMsg() bool { + return t.AfterPrecommitMsg && t.afterTrees() +} + +func (t pollTask) afterPrecommitMsgSuccess() bool { + return t.AfterPrecommitMsgSuccess && t.afterPrecommitMsg() +} + func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *types.TipSet) { - if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil && + if s.pollers[pollerPoRep].IsSet() && task.afterPrecommitMsgSuccess() && task.SeedEpoch != nil && task.TaskPoRep == nil && !task.AfterPoRep && ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) { @@ -222,8 +238,12 @@ func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *type } } +func (t pollTask) afterPoRep() bool { + return t.AfterPoRep && t.afterPrecommitMsgSuccess() +} + func (s *SealPoller) pollStartFinalize(ctx context.Context, task pollTask, ts *types.TipSet) { - if s.pollers[pollerFinalize].IsSet() && task.AfterPoRep && !task.AfterFinalize && task.TaskFinalize == nil { + if s.pollers[pollerFinalize].IsSet() && task.afterPoRep() && !task.AfterFinalize && task.TaskFinalize == nil { s.pollers[pollerFinalize].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_finalize = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_finalize is null`, id, task.SpID, task.SectorNumber) if err != nil { @@ -238,8 +258,12 @@ func (s *SealPoller) pollStartFinalize(ctx context.Context, task pollTask, ts *t } } +func (t pollTask) afterFinalize() bool { + return t.AfterFinalize && t.afterPoRep() +} + func (s *SealPoller) pollStartMoveStorage(ctx context.Context, task pollTask) { - if s.pollers[pollerMoveStorage].IsSet() && task.AfterFinalize && !task.AfterMoveStorage && task.TaskMoveStorage == nil { + if s.pollers[pollerMoveStorage].IsSet() && task.afterFinalize() && !task.AfterMoveStorage && task.TaskMoveStorage == nil { s.pollers[pollerMoveStorage].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_move_storage = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_move_storage is null`, id, task.SpID, task.SectorNumber) if err != nil { diff --git a/provider/lpseal/poller_commit_msg.go b/provider/lpseal/poller_commit_msg.go index a70b709bc..0350200ef 100644 --- a/provider/lpseal/poller_commit_msg.go +++ b/provider/lpseal/poller_commit_msg.go @@ -15,7 +15,7 @@ import ( ) func (s *SealPoller) pollStartCommitMsg(ctx context.Context, task pollTask) { - if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && !task.AfterCommitMsg && s.pollers[pollerCommitMsg].IsSet() { + if task.afterPoRep() && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && !task.AfterCommitMsg && s.pollers[pollerCommitMsg].IsSet() { s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber) if err != nil { diff --git a/provider/lpseal/poller_precommit_msg.go b/provider/lpseal/poller_precommit_msg.go index 759308bf4..c57d8d46b 100644 --- a/provider/lpseal/poller_precommit_msg.go +++ b/provider/lpseal/poller_precommit_msg.go @@ -16,7 +16,7 @@ import ( ) func (s *SealPoller) pollStartPrecommitMsg(ctx context.Context, task pollTask) { - if task.TaskPrecommitMsg == nil && !task.AfterPrecommitMsg && task.AfterTreeR && task.AfterTreeD { + if task.TaskPrecommitMsg == nil && !task.AfterPrecommitMsg && task.afterTrees() { s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true`, id, task.SpID, task.SectorNumber) if err != nil {