From 25f1fd7952867b8b7945e133b46928877a35c69b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 8 Feb 2024 22:06:57 +0100 Subject: [PATCH] lpseal: Fix message poller --- cmd/lotus-shed/lpdeal.go | 2 ++ provider/lpseal/poller_commit_msg.go | 2 +- provider/lpseal/poller_precommit_msg.go | 4 ++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cmd/lotus-shed/lpdeal.go b/cmd/lotus-shed/lpdeal.go index 65c1a94c0..bac3f167a 100644 --- a/cmd/lotus-shed/lpdeal.go +++ b/cmd/lotus-shed/lpdeal.go @@ -504,6 +504,8 @@ var lpBoostProxyCmd = &cli.Command{ pi := pis[0] pis = pis[1:] + pieceInfos[pieceCid] = pis + pieceInfoLk.Unlock() start := time.Now() diff --git a/provider/lpseal/poller_commit_msg.go b/provider/lpseal/poller_commit_msg.go index f563e2b8b..bbdc85039 100644 --- a/provider/lpseal/poller_commit_msg.go +++ b/provider/lpseal/poller_commit_msg.go @@ -15,7 +15,7 @@ import ( ) func (s *SealPoller) pollStartCommitMsg(ctx context.Context, task pollTask) { - if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && s.pollers[pollerCommitMsg].IsSet() { + if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && !task.AfterCommitMsg && s.pollers[pollerCommitMsg].IsSet() { s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber) if err != nil { diff --git a/provider/lpseal/poller_precommit_msg.go b/provider/lpseal/poller_precommit_msg.go index 3187e7bf3..759308bf4 100644 --- a/provider/lpseal/poller_precommit_msg.go +++ b/provider/lpseal/poller_precommit_msg.go @@ -16,7 +16,7 @@ import ( ) func (s *SealPoller) pollStartPrecommitMsg(ctx context.Context, task pollTask) { - if task.TaskPrecommitMsg == nil && task.AfterTreeR && task.AfterTreeD { + if task.TaskPrecommitMsg == nil && !task.AfterPrecommitMsg && task.AfterTreeR && task.AfterTreeD { s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true`, id, task.SpID, task.SectorNumber) if err != nil { @@ -44,7 +44,7 @@ type dbExecResult struct { } func (s *SealPoller) pollPrecommitMsgLanded(ctx context.Context, task pollTask) error { - if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsgSuccess { + if task.AfterPrecommitMsg && !task.AfterPrecommitMsgSuccess { var execResult []dbExecResult err := s.db.Select(ctx, &execResult, `SELECT spipeline.precommit_msg_cid, spipeline.commit_msg_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used