diff --git a/curiosrc/seal/task_submit_precommit.go b/curiosrc/seal/task_submit_precommit.go index 9cc8d446b..0f896cb93 100644 --- a/curiosrc/seal/task_submit_precommit.go +++ b/curiosrc/seal/task_submit_precommit.go @@ -28,6 +28,7 @@ import ( ) type SubmitPrecommitTaskApi interface { + ChainHead(context.Context) (*types.TipSet, error) StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (big.Int, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error) @@ -59,6 +60,8 @@ func NewSubmitPrecommitTask(sp *SealPoller, db *harmonydb.DB, api SubmitPrecommi func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { ctx := context.Background() + // 1. Load sector info + var sectorParamsArr []struct { SpID int64 `db:"sp_id"` SectorNumber int64 `db:"sector_number"` @@ -96,6 +99,8 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo return false, xerrors.Errorf("parsing unsealed CID: %w", err) } + // 2. Prepare message params + params := miner.PreCommitSectorBatchParams2{} expiration := sectorParams.TicketEpoch + miner12.MaxSectorExpirationExtension @@ -157,6 +162,26 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo params.Sectors[0].Expiration = minExpiration } + // 3. Check precommit + + { + record, err := s.checkPrecommit(ctx, params) + if err != nil { + if record { + _, perr := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline + SET failed = TRUE, failed_at = NOW(), failed_reason = 'precommit-check', failed_reason_msg = $1 + WHERE task_id_precommit_msg = $2`, err.Error(), taskID) + if perr != nil { + return false, xerrors.Errorf("persisting precommit check error: %w", perr) + } + } + + return record, xerrors.Errorf("checking precommit: %w", err) + } + } + + // 4. Prepare and send message + var pbuf bytes.Buffer if err := params.MarshalCBOR(&pbuf); err != nil { return false, xerrors.Errorf("serializing params: %w", err) @@ -210,6 +235,29 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo return true, nil } +func (s *SubmitPrecommitTask) checkPrecommit(ctx context.Context, params miner.PreCommitSectorBatchParams2) (record bool, err error) { + if len(params.Sectors) != 1 { + return false, xerrors.Errorf("expected 1 sector") + } + + preCommitInfo := params.Sectors[0] + + head, err := s.api.ChainHead(ctx) + if err != nil { + return false, xerrors.Errorf("getting chain head: %w", err) + } + height := head.Height() + + //never commit P2 message before, check ticket expiration + ticketEarliest := height - policy.MaxPreCommitRandomnessLookback + + if preCommitInfo.SealRandEpoch < ticketEarliest { + return true, xerrors.Errorf("ticket expired: seal height: %d, head: %d", preCommitInfo.SealRandEpoch+policy.SealRandomnessLookback, height) + } + + return true, nil +} + func (s *SubmitPrecommitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { id := ids[0] return &id, nil