From f2e1b5a35cef5170d7c57ed6e6c747a9c9b242f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 11 Jan 2024 22:39:42 +0100 Subject: [PATCH] lpseal: Poller for PoRep --- .../harmonydb/sql/20231217-sdr-pipeline.sql | 3 +- provider/lpseal/poller.go | 97 ++++++++++++++++--- provider/lpseal/task_porep.go | 2 +- 3 files changed, 88 insertions(+), 14 deletions(-) diff --git a/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql b/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql index c30a17e7c..a239cc8a7 100644 --- a/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql +++ b/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql @@ -49,6 +49,7 @@ create table sectors_sdr_pipeline ( -- Commit (PoRep snark) task_id_porep bigint, porep_proof bytea, + after_porep bool not null default false, -- Commit message sending commit_msg_cid text, @@ -58,8 +59,6 @@ create table sectors_sdr_pipeline ( -- Commit message wait commit_msg_tsk bytea, - - task_id_commit_msg_wait bigint, after_commit_msg_success bool not null default false, -- Failure handling diff --git a/provider/lpseal/poller.go b/provider/lpseal/poller.go index 2605a5a51..47e60ea6d 100644 --- a/provider/lpseal/poller.go +++ b/provider/lpseal/poller.go @@ -28,9 +28,12 @@ const ( ) const sealPollerInterval = 10 * time.Second +const seedEpochConfidence = 3 type SealPollerAPI interface { StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error) + StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error) + ChainHead(context.Context) (*types.TipSet, error) } type SealPoller struct { @@ -83,16 +86,17 @@ func (s *SealPoller) poll(ctx context.Context) error { TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` AfterPrecommitMsg bool `db:"after_precommit_msg"` - AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` + AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` + SeedEpoch *int64 `db:"seed_epoch"` TaskPoRep *int64 `db:"task_id_porep"` PoRepProof []byte `db:"porep_proof"` + AfterPoRep bool `db:"after_porep"` TaskCommitMsg *int64 `db:"task_id_commit_msg"` AfterCommitMsg bool `db:"after_commit_msg"` - TaskCommitMsgWait *int64 `db:"task_id_commit_msg_wait"` - AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` + AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` Failed bool `db:"failed"` FailedReason string `db:"failed_reason"` @@ -105,10 +109,10 @@ func (s *SealPoller) poll(ctx context.Context) error { task_id_tree_c, after_tree_c, task_id_tree_r, after_tree_r, task_id_precommit_msg, after_precommit_msg, - after_precommit_msg_success, - task_id_porep, porep_proof, + after_precommit_msg_success, seed_epoch, + task_id_porep, porep_proof, after_porep, task_id_commit_msg, after_commit_msg, - task_id_commit_msg_wait, after_commit_msg_success, + after_commit_msg_success, failed, failed_reason FROM sectors_sdr_pipeline WHERE after_commit_msg_success != true`) if err != nil { @@ -163,8 +167,6 @@ func (s *SealPoller) poll(ctx context.Context) error { } if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsgSuccess { - // join pipeline precommit_msg_cid with message_waits signed_message_cid, if executed_tsk_epoch is not null, return result - var execResult []struct { ExecutedTskCID string `db:"executed_tsk_cid"` ExecutedTskEpoch int64 `db:"executed_tsk_epoch"` @@ -208,9 +210,82 @@ func (s *SealPoller) poll(ctx context.Context) error { } } - todoWaitSeed := false - if task.TaskPoRep != nil && todoWaitSeed { - // todo start porep task + ts, err := s.api.ChainHead(ctx) + if err != nil { + return xerrors.Errorf("getting chain head: %w", err) + } + + if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil && task.TaskPoRep == nil && ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) { + s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_porep = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_porep is null`, id, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } + + return true, nil + }) + } + + if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskPrecommitMsg == nil { + s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } + + return true, nil + }) + } + + if task.AfterCommitMsg && !task.AfterCommitMsgSuccess { + var execResult []struct { + ExecutedTskCID string `db:"executed_tsk_cid"` + ExecutedTskEpoch int64 `db:"executed_tsk_epoch"` + ExecutedMsgCID string `db:"executed_msg_cid"` + + ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"` + ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"` + } + + err := s.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used + FROM sectors_sdr_pipeline + JOIN message_waits ON sectors_sdr_pipeline.commit_msg_cid = message_waits.signed_message_cid + WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber) + if err != nil { + log.Errorw("failed to query message_waits", "error", err) + } + + if len(execResult) > 0 { + maddr, err := address.NewIDAddress(uint64(task.SpID)) + if err != nil { + return err + } + + si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) + if err != nil { + return xerrors.Errorf("get sector info: %w", err) + } + + if si != nil { + // todo handdle missing sector info (not found after cron) + } else { + // yay! + + _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET + after_commit_msg_success = true, commit_msg_tsk = $1 + WHERE sp_id = $2 AND sector_number = $3 and after_commit_msg_success is NULL`, + execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) + if err != nil { + return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + } + } } } diff --git a/provider/lpseal/task_porep.go b/provider/lpseal/task_porep.go index 8a428d20c..96d0dc56c 100644 --- a/provider/lpseal/task_porep.go +++ b/provider/lpseal/task_porep.go @@ -113,7 +113,7 @@ func (p *PoRepTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done // store success! n, err := p.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET after_sdr = true, seed_value = $3, porep_proof = $4 + SET after_porep = true, seed_value = $3, porep_proof = $4 WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber, []byte(rand), proof) if err != nil {