lpseal: Allow broken dep chains in poller

This commit is contained in:
Łukasz Magiera 2024-02-17 15:01:59 +01:00
parent 847b589822
commit 1f5013b607
3 changed files with 29 additions and 5 deletions

View File

@ -183,6 +183,10 @@ func (s *SealPoller) pollStartSDR(ctx context.Context, task pollTask) {
}
}
func (t pollTask) afterSDR() bool {
return t.AfterSDR
}
func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) {
if !task.AfterTreeD && !task.AfterTreeC && !task.AfterTreeR &&
task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil &&
@ -203,8 +207,20 @@ func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) {
}
}
func (t pollTask) afterTrees() bool {
return t.AfterTreeD && t.AfterTreeC && t.AfterTreeR && t.afterSDR()
}
func (t pollTask) afterPrecommitMsg() bool {
return t.AfterPrecommitMsg && t.afterTrees()
}
func (t pollTask) afterPrecommitMsgSuccess() bool {
return t.AfterPrecommitMsgSuccess && t.afterPrecommitMsg()
}
func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *types.TipSet) {
if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil &&
if s.pollers[pollerPoRep].IsSet() && task.afterPrecommitMsgSuccess() && task.SeedEpoch != nil &&
task.TaskPoRep == nil && !task.AfterPoRep &&
ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) {
@ -222,8 +238,12 @@ func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *type
}
}
func (t pollTask) afterPoRep() bool {
return t.AfterPoRep && t.afterPrecommitMsgSuccess()
}
func (s *SealPoller) pollStartFinalize(ctx context.Context, task pollTask, ts *types.TipSet) {
if s.pollers[pollerFinalize].IsSet() && task.AfterPoRep && !task.AfterFinalize && task.TaskFinalize == nil {
if s.pollers[pollerFinalize].IsSet() && task.afterPoRep() && !task.AfterFinalize && task.TaskFinalize == nil {
s.pollers[pollerFinalize].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_finalize = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_finalize is null`, id, task.SpID, task.SectorNumber)
if err != nil {
@ -238,8 +258,12 @@ func (s *SealPoller) pollStartFinalize(ctx context.Context, task pollTask, ts *t
}
}
func (t pollTask) afterFinalize() bool {
return t.AfterFinalize && t.afterPoRep()
}
func (s *SealPoller) pollStartMoveStorage(ctx context.Context, task pollTask) {
if s.pollers[pollerMoveStorage].IsSet() && task.AfterFinalize && !task.AfterMoveStorage && task.TaskMoveStorage == nil {
if s.pollers[pollerMoveStorage].IsSet() && task.afterFinalize() && !task.AfterMoveStorage && task.TaskMoveStorage == nil {
s.pollers[pollerMoveStorage].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_move_storage = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_move_storage is null`, id, task.SpID, task.SectorNumber)
if err != nil {

View File

@ -15,7 +15,7 @@ import (
)
func (s *SealPoller) pollStartCommitMsg(ctx context.Context, task pollTask) {
if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && !task.AfterCommitMsg && s.pollers[pollerCommitMsg].IsSet() {
if task.afterPoRep() && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && !task.AfterCommitMsg && s.pollers[pollerCommitMsg].IsSet() {
s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber)
if err != nil {

View File

@ -16,7 +16,7 @@ import (
)
func (s *SealPoller) pollStartPrecommitMsg(ctx context.Context, task pollTask) {
if task.TaskPrecommitMsg == nil && !task.AfterPrecommitMsg && task.AfterTreeR && task.AfterTreeD {
if task.TaskPrecommitMsg == nil && !task.AfterPrecommitMsg && task.afterTrees() {
s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true`, id, task.SpID, task.SectorNumber)
if err != nil {